ctx.cpp 18.9 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#ifndef ZMQ_HAVE_WINDOWS
33 34 35
#include <unistd.h>
#endif

36
#include <limits>
37
#include <climits>
38
#include <new>
f18m's avatar
f18m committed
39
#include <sstream>
40
#include <string.h>
41

42
#include "ctx.hpp"
43
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
44
#include "io_thread.hpp"
45
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "pipe.hpp"
47 48
#include "err.hpp"
#include "msg.hpp"
49
#include "random.hpp"
50

Ilya Kulakov's avatar
Ilya Kulakov committed
51 52 53 54
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

55 56 57
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

58
int clipped_maxsocket (int max_requested)
59
{
Richard Newton's avatar
Richard Newton committed
60
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
61 62
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
63

64 65 66
    return max_requested;
}

67
zmq::ctx_t::ctx_t () :
68
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
69 70 71 72 73
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
74
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
75
    max_msgsz (INT_MAX),
Pieter Hintjens's avatar
Pieter Hintjens committed
76
    io_thread_count (ZMQ_IO_THREADS_DFLT),
77
    blocky (true),
78 79
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
80 81
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT),
    thread_affinity (ZMQ_THREAD_AFFINITY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
82
{
83 84 85
#ifdef HAVE_FORK
    pid = getpid();
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
86 87 88 89
#ifdef ZMQ_HAVE_VMCI
    vmci_fd = -1;
    vmci_family = -1;
#endif
90

91 92
    //  Initialise crypto library, if needed.
    zmq::random_open ();
93 94
}

95 96
bool zmq::ctx_t::check_tag ()
{
97
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
98 99
}

100
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
101
{
102
    //  Check that there are no remaining sockets.
103 104
    zmq_assert (sockets.empty ());

105 106
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
107
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
Martin Sustrik's avatar
Martin Sustrik committed
108
        io_threads [i]->stop ();
109
    }
Martin Sustrik's avatar
Martin Sustrik committed
110 111

    //  Wait till I/O threads actually terminate.
112 113 114
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        LIBZMQ_DELETE(io_threads [i]);
    }
Martin Sustrik's avatar
Martin Sustrik committed
115

116
    //  Deallocate the reaper thread object.
117
    LIBZMQ_DELETE(reaper);
118

119 120
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
121
    //  corresponding io_thread/socket objects.
122
    free (slots);
123

124 125
    //  De-initialise crypto library, if needed.
    zmq::random_close ();
126

127
    //  Remove the tag, so that the object is considered dead.
128
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
129 130
}

131
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
132
{
133
    slot_sync.lock();
134

135 136
    bool saveTerminating = terminating;
    terminating = false;
137

138
    // Connect up any pending inproc connections, otherwise we will hang
139 140 141
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
142 143
        // create_socket might fail eg: out of memory/sockets limit reached
        zmq_assert (s);
144 145 146
        s->bind (p->first.c_str ());
        s->close ();
    }
147
    terminating = saveTerminating;
148

149
    if (!starting) {
150

151
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
152
        if (pid != getpid ()) {
153 154 155
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
156
                sockets [i]->get_mailbox ()->forked ();
157

Martin Hurton's avatar
Martin Hurton committed
158
            term_mailbox.forked ();
159 160
        }
#endif
161

162 163 164
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
165
        terminating = true;
166

167 168 169 170 171 172 173 174 175 176
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
177
        slot_sync.unlock();
178 179 180 181 182 183

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
184
        errno_assert (rc == 0);
185 186 187 188
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
189
    slot_sync.unlock ();
190

Ilya Kulakov's avatar
Ilya Kulakov committed
191 192 193 194 195 196 197 198 199 200
#ifdef ZMQ_HAVE_VMCI
    vmci_sync.lock ();

    VMCISock_ReleaseAFValueFd (vmci_fd);
    vmci_family = -1;
    vmci_fd = -1;

    vmci_sync.unlock ();
#endif

201 202
    //  Deallocate the resources.
    delete this;
203

204 205
    return 0;
}
206

207 208
int zmq::ctx_t::shutdown ()
{
209
    scoped_lock_t locker(slot_sync);
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }

    return 0;
}

226 227 228
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
229 230
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
231
        scoped_lock_t locker(opt_sync);
232 233 234
        max_sockets = optval_;
    }
    else
235
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
236
        scoped_lock_t locker(opt_sync);
237 238
        io_thread_count = optval_;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
239 240
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
241
        scoped_lock_t locker(opt_sync);
242
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
243
    }
244 245
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
246
        scoped_lock_t locker(opt_sync);
247 248 249 250
        thread_priority = optval_;
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
251
        scoped_lock_t locker(opt_sync);
252 253
        thread_sched_policy = optval_;
    }
254
    else
255 256 257 258 259
    if (option_ == ZMQ_THREAD_AFFINITY && optval_ >= 0) {
        scoped_lock_t locker(opt_sync);
        thread_affinity = optval_;
    }
    else
f18m's avatar
f18m committed
260 261 262 263 264 265 266
    if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
        std::ostringstream s;
        s << optval_;
        scoped_lock_t locker(opt_sync);
        thread_name_prefix = s.str();
    }
    else
267
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
268
        scoped_lock_t locker(opt_sync);
269 270
        blocky = (optval_ != 0);
    }
271 272
    else
    if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
273
        scoped_lock_t locker(opt_sync);
274 275
        max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
    }
276 277 278 279 280 281 282 283 284 285 286 287 288
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
289
    if (option_ == ZMQ_SOCKET_LIMIT)
290
        rc = clipped_maxsocket (65535);
291
    else
292 293
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
294 295 296
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
297 298 299
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
300 301 302
    else
    if (option_ == ZMQ_MAX_MSGSZ)
        rc = max_msgsz;
303 304 305
    else
    if (option_ == ZMQ_MSG_T_SIZE)
        rc = sizeof (zmq_msg_t);
306 307 308 309 310 311 312
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

313 314
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
315
    scoped_lock_t locker(slot_sync);
316

317 318 319 320
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
321
        //  zmq_ctx_term thread and reaper thread.
322 323 324 325 326
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
327
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
328 329
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
330
        //  Initialise the infrastructure for zmq_ctx_term thread.
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
356
    //  Once zmq_ctx_term() was called, we can't create new sockets.
357 358 359 360
    if (terminating) {
        errno = ETERM;
        return NULL;
    }
361

362 363 364 365
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
366
    }
367

368 369 370
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
371

372 373 374
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

375
    //  Create the socket and register its mailbox.
376
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
377 378
    if (!s) {
        empty_slots.push_back (slot);
379
        return NULL;
380 381
    }
    sockets.push_back (s);
382
    slots [slot] = s->get_mailbox ();
383 384

    return s;
Martin Sustrik's avatar
Martin Sustrik committed
385 386
}

387
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
388
{
389
    scoped_lock_t locker(slot_sync);
390

Martin Hurton's avatar
Martin Hurton committed
391
    //  Free the associated thread slot.
392 393
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
394
    slots [tid] = NULL;
395

396 397 398
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
399
    //  If zmq_ctx_term() was already called and there are no more socket
400 401 402
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
403 404
}

405 406 407 408 409
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

410 411
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
f18m's avatar
f18m committed
412 413
    static unsigned int nthreads_started = 0;

414
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity);
415
    thread_.start(tfn_, arg_);
416
#ifndef ZMQ_HAVE_ANDROID
f18m's avatar
f18m committed
417 418 419
    std::ostringstream s;
    s << thread_name_prefix << "/ZMQbg/" << nthreads_started;
    thread_.setThreadName (s.str().c_str());
420
#endif
f18m's avatar
f18m committed
421
    nthreads_started++;
422 423
}

Martin Sustrik's avatar
Martin Sustrik committed
424
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
425
{
Martin Sustrik's avatar
Martin Sustrik committed
426
    slots [tid_]->send (command_);
427 428
}

429
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
430
{
431 432 433
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
434
    //  Find the I/O thread with minimum load.
435
    int min_load = -1;
436
    io_thread_t *selected_io_thread = NULL;
437 438
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
439
            int load = io_threads [i]->get_load ();
440
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
441
                min_load = load;
442
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
443 444 445
            }
        }
    }
446
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
447
}
Martin Sustrik's avatar
Martin Sustrik committed
448

449 450
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
451
{
452
    scoped_lock_t locker(endpoints_sync);
453

454
    const bool inserted = endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)).second;
455 456 457 458 459 460 461
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
462 463 464
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
465
    scoped_lock_t locker(endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
466 467 468 469 470 471 472 473 474 475 476 477 478

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    return 0;
}

479
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
480
{
481
    scoped_lock_t locker(endpoints_sync);
482 483 484

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
485
        if (it->second.socket == socket_) {
486
            endpoints_t::iterator to_erase = it;
487
            ++it;
488 489 490
            endpoints.erase (to_erase);
            continue;
        }
491
        ++it;
492 493 494
    }
}

495
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
496
{
497
    scoped_lock_t locker(endpoints_sync);
498

499 500 501 502 503
    endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end ()) {
        errno = ECONNREFUSED;
        endpoint_t empty = {NULL, options_t()};
        return empty;
504
     }
505
     endpoint_t endpoint = it->second;
506 507 508

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
509 510
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
511
     endpoint.socket->inc_seqnum ();
512

513
     return endpoint;
514
}
515

Martin Hurton's avatar
Martin Hurton committed
516 517
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
518
{
519
    scoped_lock_t locker(endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
520

521
    const pending_connection_t pending_connection = {endpoint_, pipes_ [0], pipes_ [1]};
522

523
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
524
    if (it == endpoints.end ()) {
525
        //  Still no bind.
Martin Hurton's avatar
Martin Hurton committed
526 527
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
528
    } else {
529 530 531
        //  Bind has happened in the mean time, connect directly
        connect_inproc_sockets(it->second.socket, it->second.options, pending_connection, connect_side);
    }
532 533
}

534
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
535
{
536
    scoped_lock_t locker(endpoints_sync);
537

538 539
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
540 541 542 543 544
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
545
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
546
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
547 548
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
549
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
550

551
    if (!bind_options.recv_routing_id) {
552 553 554 555 556 557 558
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

559 560 561 562 563 564 565
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

566 567 568 569 570 571 572 573 574 575 576
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
577 578 579 580 581 582 583 584 585 586

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
587

588 589 590
    // When a ctx is terminated all pending inproc connection will be
    // connected, but the socket will already be closed and the pipe will be
    // in waiting_for_delimiter state, which means no more writes can be done
591
    // and the routing id write fails and causes an assert. Check if the socket
592
    // is open before sending.
593
    if (pending_connection_.endpoint.options.recv_routing_id &&
594
            pending_connection_.endpoint.socket->check_tag ()) {
595 596
        msg_t routing_id;
        const int rc = routing_id.init_size (bind_options.routing_id_size);
597
        errno_assert (rc == 0);
598 599 600
        memcpy (routing_id.data (), bind_options.routing_id, bind_options.routing_id_size);
        routing_id.set_flags (msg_t::routing_id);
        const bool written = pending_connection_.bind_pipe->write (&routing_id);
601 602
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
603 604 605
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
606 607 608 609
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
610
    zmq::scoped_lock_t locker(vmci_sync);
Ilya Kulakov's avatar
Ilya Kulakov committed
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627

    if (vmci_fd == -1)  {
        vmci_family = VMCISock_GetAFValueFd (&vmci_fd);

        if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
            int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
            errno_assert (rc != -1);
#endif
        }
    }

    return vmci_family;
}

#endif

628 629 630 631
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;