ctx.cpp 18.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#ifndef ZMQ_HAVE_WINDOWS
33 34 35
#include <unistd.h>
#endif

36
#include <limits>
37
#include <climits>
38
#include <new>
39
#include <string.h>
40

41
#include "ctx.hpp"
42
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
43
#include "io_thread.hpp"
44
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45
#include "pipe.hpp"
46 47
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
48

49 50
#if defined (ZMQ_USE_TWEETNACL)
#   include "tweetnacl.h"
51
#elif defined (ZMQ_USE_LIBSODIUM)
52
#   include "sodium.h"
53 54
#endif

Ilya Kulakov's avatar
Ilya Kulakov committed
55 56 57 58
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

59 60 61
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

62
int clipped_maxsocket (int max_requested)
63
{
Richard Newton's avatar
Richard Newton committed
64
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
65 66
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
67

68 69 70
    return max_requested;
}

71
zmq::ctx_t::ctx_t () :
72
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
73 74 75 76 77
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
78
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
79
    max_msgsz (INT_MAX),
Pieter Hintjens's avatar
Pieter Hintjens committed
80
    io_thread_count (ZMQ_IO_THREADS_DFLT),
81
    blocky (true),
82 83 84
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
85
{
86 87 88
#ifdef HAVE_FORK
    pid = getpid();
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
89 90 91 92
#ifdef ZMQ_HAVE_VMCI
    vmci_fd = -1;
    vmci_family = -1;
#endif
93

94
    scoped_lock_t locker(crypto_sync);
95 96 97 98
#if defined (ZMQ_USE_TWEETNACL)
    // allow opening of /dev/urandom
    unsigned char tmpbytes[4];
    randombytes(tmpbytes, 4);
99
#elif defined (ZMQ_USE_LIBSODIUM)
100 101 102
    int rc = sodium_init ();
    zmq_assert (rc != -1);
#endif
103 104
}

105 106
bool zmq::ctx_t::check_tag ()
{
107
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
108 109
}

110
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
111
{
112
    //  Check that there are no remaining sockets.
113 114
    zmq_assert (sockets.empty ());

115 116
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
117
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
Martin Sustrik's avatar
Martin Sustrik committed
118
        io_threads [i]->stop ();
119
    }
Martin Sustrik's avatar
Martin Sustrik committed
120 121

    //  Wait till I/O threads actually terminate.
122 123 124
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        LIBZMQ_DELETE(io_threads [i]);
    }
Martin Sustrik's avatar
Martin Sustrik committed
125

126
    //  Deallocate the reaper thread object.
127
    LIBZMQ_DELETE(reaper);
128

129 130
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
131
    //  corresponding io_thread/socket objects.
132
    free (slots);
133

134 135
    //  If we've done any Curve encryption, we may have a file handle
    //  to /dev/urandom open that needs to be cleaned up.
136 137
#ifdef ZMQ_HAVE_CURVE
    randombytes_close ();
138 139
#endif

140
    //  Remove the tag, so that the object is considered dead.
141
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
142 143
}

144
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
145
{
146
    slot_sync.lock();
147

148 149
    bool saveTerminating = terminating;
    terminating = false;
150

151
    // Connect up any pending inproc connections, otherwise we will hang
152 153 154
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
155 156
        // create_socket might fail eg: out of memory/sockets limit reached
        zmq_assert (s);
157 158 159
        s->bind (p->first.c_str ());
        s->close ();
    }
160
    terminating = saveTerminating;
161

162
    if (!starting) {
163

164
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
165
        if (pid != getpid ()) {
166 167 168
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
169
                sockets [i]->get_mailbox ()->forked ();
170

Martin Hurton's avatar
Martin Hurton committed
171
            term_mailbox.forked ();
172 173
        }
#endif
174

175 176 177
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
178
        terminating = true;
179

180 181 182 183 184 185 186 187 188 189
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
190
        slot_sync.unlock();
191 192 193 194 195 196

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
197
        errno_assert (rc == 0);
198 199 200 201
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
202
    slot_sync.unlock ();
203

Ilya Kulakov's avatar
Ilya Kulakov committed
204 205 206 207 208 209 210 211 212 213
#ifdef ZMQ_HAVE_VMCI
    vmci_sync.lock ();

    VMCISock_ReleaseAFValueFd (vmci_fd);
    vmci_family = -1;
    vmci_fd = -1;

    vmci_sync.unlock ();
#endif

214 215
    //  Deallocate the resources.
    delete this;
216

217 218
    return 0;
}
219

220 221
int zmq::ctx_t::shutdown ()
{
222
    scoped_lock_t locker(slot_sync);
223

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }

    return 0;
}

239 240 241
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
242 243
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
244
        scoped_lock_t locker(opt_sync);
245 246 247
        max_sockets = optval_;
    }
    else
248
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
249
        scoped_lock_t locker(opt_sync);
250 251
        io_thread_count = optval_;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
252 253
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
254
        scoped_lock_t locker(opt_sync);
255
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
256
    }
257 258
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
259
        scoped_lock_t locker(opt_sync);
260 261 262 263
        thread_priority = optval_;
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
264
        scoped_lock_t locker(opt_sync);
265 266
        thread_sched_policy = optval_;
    }
267 268
    else
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
269
        scoped_lock_t locker(opt_sync);
270 271
        blocky = (optval_ != 0);
    }
272 273
    else
    if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
274
        scoped_lock_t locker(opt_sync);
275 276
        max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
    }
277 278 279 280 281 282 283 284 285 286 287 288 289
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
290
    if (option_ == ZMQ_SOCKET_LIMIT)
291
        rc = clipped_maxsocket (65535);
292
    else
293 294
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
295 296 297
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
298 299 300
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
301 302 303
    else
    if (option_ == ZMQ_MAX_MSGSZ)
        rc = max_msgsz;
304 305 306
    else
    if (option_ == ZMQ_MSG_T_SIZE)
        rc = sizeof (zmq_msg_t);
307 308 309 310 311 312 313
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

314 315
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
316
    scoped_lock_t locker(slot_sync);
317

318 319 320 321
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
322
        //  zmq_ctx_term thread and reaper thread.
323 324 325 326 327
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
328
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
329 330
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
331
        //  Initialise the infrastructure for zmq_ctx_term thread.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
357
    //  Once zmq_ctx_term() was called, we can't create new sockets.
358 359 360 361
    if (terminating) {
        errno = ETERM;
        return NULL;
    }
362

363 364 365 366
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
367
    }
368

369 370 371
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
372

373 374 375
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

376
    //  Create the socket and register its mailbox.
377
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
378 379
    if (!s) {
        empty_slots.push_back (slot);
380
        return NULL;
381 382
    }
    sockets.push_back (s);
383
    slots [slot] = s->get_mailbox ();
384 385

    return s;
Martin Sustrik's avatar
Martin Sustrik committed
386 387
}

388
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
389
{
390
    scoped_lock_t locker(slot_sync);
391

Martin Hurton's avatar
Martin Hurton committed
392
    //  Free the associated thread slot.
393 394
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
395
    slots [tid] = NULL;
396

397 398 399
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
400
    //  If zmq_ctx_term() was already called and there are no more socket
401 402 403
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
404 405
}

406 407 408 409 410
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

411 412 413 414
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
415
#ifndef ZMQ_HAVE_ANDROID
416
    thread_.setThreadName ("ZMQ background");
417
#endif
418 419
}

Martin Sustrik's avatar
Martin Sustrik committed
420
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
421
{
Martin Sustrik's avatar
Martin Sustrik committed
422
    slots [tid_]->send (command_);
423 424
}

425
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
426
{
427 428 429
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
430
    //  Find the I/O thread with minimum load.
431
    int min_load = -1;
432
    io_thread_t *selected_io_thread = NULL;
433 434
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
435
            int load = io_threads [i]->get_load ();
436
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
437
                min_load = load;
438
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
439 440 441
            }
        }
    }
442
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
443
}
Martin Sustrik's avatar
Martin Sustrik committed
444

445 446
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
447
{
448
    scoped_lock_t locker(endpoints_sync);
449

450
    const bool inserted = endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)).second;
451 452 453 454 455 456 457
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
458 459 460
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
461
    scoped_lock_t locker(endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
462 463 464 465 466 467 468 469 470 471 472 473 474

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    return 0;
}

475
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
476
{
477
    scoped_lock_t locker(endpoints_sync);
478 479 480

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
481
        if (it->second.socket == socket_) {
482
            endpoints_t::iterator to_erase = it;
483
            ++it;
484 485 486
            endpoints.erase (to_erase);
            continue;
        }
487
        ++it;
488 489 490
    }
}

491
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
492
{
493
    scoped_lock_t locker(endpoints_sync);
494

495 496 497 498 499
    endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end ()) {
        errno = ECONNREFUSED;
        endpoint_t empty = {NULL, options_t()};
        return empty;
500
     }
501
     endpoint_t endpoint = it->second;
502 503 504

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
505 506
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
507
     endpoint.socket->inc_seqnum ();
508

509
     return endpoint;
510
}
511

Martin Hurton's avatar
Martin Hurton committed
512 513
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
514
{
515
    scoped_lock_t locker(endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
516

517
    const pending_connection_t pending_connection = {endpoint_, pipes_ [0], pipes_ [1]};
518

519
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
520
    if (it == endpoints.end ()) {
521
        //  Still no bind.
Martin Hurton's avatar
Martin Hurton committed
522 523
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
524
    } else {
525 526 527
        //  Bind has happened in the mean time, connect directly
        connect_inproc_sockets(it->second.socket, it->second.options, pending_connection, connect_side);
    }
528 529
}

530
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
531
{
532
    scoped_lock_t locker(endpoints_sync);
533

534 535
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
536 537 538 539 540
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
541
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
542
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
543 544
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
545
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
546

547 548 549 550 551 552 553 554
    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

555 556 557 558 559 560 561
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

562 563 564 565 566 567 568 569 570 571 572
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
573 574 575 576 577 578 579 580 581 582

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
583

584 585 586 587 588 589 590
    // When a ctx is terminated all pending inproc connection will be
    // connected, but the socket will already be closed and the pipe will be
    // in waiting_for_delimiter state, which means no more writes can be done
    // and the identity write fails and causes an assert. Check if the socket
    // is open before sending.
    if (pending_connection_.endpoint.options.recv_identity &&
            pending_connection_.endpoint.socket->check_tag ()) {
591
        msg_t id;
592
        const int rc = id.init_size (bind_options.identity_size);
593 594 595
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
596
        const bool written = pending_connection_.bind_pipe->write (&id);
597 598
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
599 600 601
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
602 603 604 605
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
606
    zmq::scoped_lock_t locker(vmci_sync);
Ilya Kulakov's avatar
Ilya Kulakov committed
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623

    if (vmci_fd == -1)  {
        vmci_family = VMCISock_GetAFValueFd (&vmci_fd);

        if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
            int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
            errno_assert (rc != -1);
#endif
        }
    }

    return vmci_family;
}

#endif

624 625 626 627
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;