ctx.cpp 18.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
33
#ifdef ZMQ_HAVE_WINDOWS
34 35 36 37 38
#include "windows.hpp"
#else
#include <unistd.h>
#endif

39
#include <limits>
40
#include <climits>
41
#include <new>
42
#include <string.h>
43

44
#include "ctx.hpp"
45
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "io_thread.hpp"
47
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
48
#include "pipe.hpp"
49 50
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
51

52 53
#if defined (ZMQ_USE_TWEETNACL)
#   include "tweetnacl.h"
54 55
#elif defined (HAVE_LIBSODIUM)
#   include "sodium.h"
56 57
#endif

Ilya Kulakov's avatar
Ilya Kulakov committed
58 59 60 61
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

62 63 64
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

65
int clipped_maxsocket (int max_requested)
66
{
Richard Newton's avatar
Richard Newton committed
67
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
68 69
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
70

71 72 73
    return max_requested;
}

74
zmq::ctx_t::ctx_t () :
75
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
76 77 78 79 80
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
81
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
82
    max_msgsz (INT_MAX),
Pieter Hintjens's avatar
Pieter Hintjens committed
83
    io_thread_count (ZMQ_IO_THREADS_DFLT),
84
    blocky (true),
85 86 87
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
88
{
89 90 91
#ifdef HAVE_FORK
    pid = getpid();
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
92 93 94 95
#ifdef ZMQ_HAVE_VMCI
    vmci_fd = -1;
    vmci_family = -1;
#endif
96 97
}

98 99
bool zmq::ctx_t::check_tag ()
{
100
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
101 102
}

103
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
104
{
105
    //  Check that there are no remaining sockets.
106 107
    zmq_assert (sockets.empty ());

108 109
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
110
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
Martin Sustrik's avatar
Martin Sustrik committed
111
        io_threads [i]->stop ();
112
    }
Martin Sustrik's avatar
Martin Sustrik committed
113 114

    //  Wait till I/O threads actually terminate.
115 116 117
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        LIBZMQ_DELETE(io_threads [i]);
    }
Martin Sustrik's avatar
Martin Sustrik committed
118

119
    //  Deallocate the reaper thread object.
120
    LIBZMQ_DELETE(reaper);
121

122 123
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
124
    //  corresponding io_thread/socket objects.
125
    free (slots);
126

127 128
    //  If we've done any Curve encryption, we may have a file handle
    //  to /dev/urandom open that needs to be cleaned up.
129 130
#ifdef ZMQ_HAVE_CURVE
    randombytes_close ();
131 132
#endif

133
    //  Remove the tag, so that the object is considered dead.
134
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
135 136
}

137
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
138
{
139
    slot_sync.lock();
140

141 142
    bool saveTerminating = terminating;
    terminating = false;
143

144
    // Connect up any pending inproc connections, otherwise we will hang
145 146 147 148 149 150
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
        s->bind (p->first.c_str ());
        s->close ();
    }
151
    terminating = saveTerminating;
152

153
    if (!starting) {
154

155
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
156
        if (pid != getpid ()) {
157 158 159
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
160
                sockets [i]->get_mailbox ()->forked ();
161

Martin Hurton's avatar
Martin Hurton committed
162
            term_mailbox.forked ();
163 164
        }
#endif
165

166 167 168
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
169
        terminating = true;
170

171 172 173 174 175 176 177 178 179 180
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
181
        slot_sync.unlock();
182 183 184 185 186 187

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
188
        errno_assert (rc == 0);
189 190 191 192
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
193
    slot_sync.unlock ();
194

Ilya Kulakov's avatar
Ilya Kulakov committed
195 196 197 198 199 200 201 202 203 204
#ifdef ZMQ_HAVE_VMCI
    vmci_sync.lock ();

    VMCISock_ReleaseAFValueFd (vmci_fd);
    vmci_family = -1;
    vmci_fd = -1;

    vmci_sync.unlock ();
#endif

205 206
    //  Deallocate the resources.
    delete this;
207

208 209
    return 0;
}
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
int zmq::ctx_t::shutdown ()
{
    slot_sync.lock ();
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }
    slot_sync.unlock ();

    return 0;
}

230 231 232
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
233 234
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
235 236 237 238 239
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
240
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
241 242 243 244
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
245 246 247
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
        opt_sync.lock ();
248
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
249 250
        opt_sync.unlock ();
    }
251 252 253 254
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
        opt_sync.lock();
        thread_priority = optval_;
255
        opt_sync.unlock ();
256 257 258 259 260
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
        opt_sync.lock();
        thread_sched_policy = optval_;
261
        opt_sync.unlock ();
262
    }
263 264 265 266 267 268
    else
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
        opt_sync.lock ();
        blocky = (optval_ != 0);
        opt_sync.unlock ();
    }
269 270 271 272 273 274
    else
    if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
        opt_sync.lock ();
        max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
        opt_sync.unlock ();
    }
275 276 277 278 279 280 281 282 283 284 285 286 287
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
288
    if (option_ == ZMQ_SOCKET_LIMIT)
289
        rc = clipped_maxsocket (65535);
290
    else
291 292
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
293 294 295
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
296 297 298
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
299 300 301
    else
    if (option_ == ZMQ_MAX_MSGSZ)
        rc = max_msgsz;
302 303 304 305 306 307 308
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

309 310
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
311
    slot_sync.lock ();
312 313 314 315
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
316
        //  zmq_ctx_term thread and reaper thread.
317 318 319 320 321
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
322
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
323 324
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
325
        //  Initialise the infrastructure for zmq_ctx_term thread.
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
351
    //  Once zmq_ctx_term() was called, we can't create new sockets.
352 353 354 355 356
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
357

358 359 360 361 362
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
363
    }
364

365 366 367
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
368

369 370 371
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

372
    //  Create the socket and register its mailbox.
373
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
374 375 376
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
377
        return NULL;
378 379
    }
    sockets.push_back (s);
380
    slots [slot] = s->get_mailbox ();
381

382
    slot_sync.unlock ();
383
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
384 385
}

386
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
387
{
388 389
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
390
    //  Free the associated thread slot.
391 392
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
393
    slots [tid] = NULL;
394

395 396 397
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
398
    //  If zmq_ctx_term() was already called and there are no more socket
399 400 401
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
402 403

    slot_sync.unlock ();
404 405
}

406 407 408 409 410
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

411 412 413 414 415 416
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}

Martin Sustrik's avatar
Martin Sustrik committed
417
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
418
{
Martin Sustrik's avatar
Martin Sustrik committed
419
    slots [tid_]->send (command_);
420 421
}

422
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
423
{
424 425 426
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
427
    //  Find the I/O thread with minimum load.
428
    int min_load = -1;
429
    io_thread_t *selected_io_thread = NULL;
430 431
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
432
            int load = io_threads [i]->get_load ();
433
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
434
                min_load = load;
435
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
436 437 438
            }
        }
    }
439
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
440
}
Martin Sustrik's avatar
Martin Sustrik committed
441

442 443
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
444 445 446
{
    endpoints_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
447 448
    const bool inserted = endpoints.insert (
        endpoints_t::value_type (std::string (addr_), endpoint_)).second;
449 450 451

    endpoints_sync.unlock ();

452 453 454 455 456 457 458
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    endpoints_sync.lock ();

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        endpoints_sync.unlock ();
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    endpoints_sync.unlock ();

    return 0;
}

479
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
480 481 482 483 484
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
485
        if (it->second.socket == socket_) {
486
            endpoints_t::iterator to_erase = it;
487
            ++it;
488 489 490
            endpoints.erase (to_erase);
            continue;
        }
491
        ++it;
492
    }
Martin Hurton's avatar
Martin Hurton committed
493

494 495 496
    endpoints_sync.unlock ();
}

497
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
498 499 500 501 502 503 504
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
505 506
         endpoint_t empty = {NULL, options_t()};
         return empty;
507
     }
508
     endpoint_t endpoint = it->second;
509 510 511

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
512 513
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
514
     endpoint.socket->inc_seqnum ();
515 516

     endpoints_sync.unlock ();
517
     return endpoint;
518
}
519

Martin Hurton's avatar
Martin Hurton committed
520 521
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
522
{
Martin Hurton's avatar
Martin Hurton committed
523 524 525
    const pending_connection_t pending_connection =
        {endpoint_, pipes_ [0], pipes_ [1]};

526 527
    endpoints_sync.lock ();

528
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
529
    if (it == endpoints.end ()) {
530
        // Still no bind.
Martin Hurton's avatar
Martin Hurton committed
531 532
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
533 534 535
    }
    else
        // Bind has happened in the mean time, connect directly
Martin Hurton's avatar
Martin Hurton committed
536
        connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
537 538 539 540

    endpoints_sync.unlock ();
}

541
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
542 543 544
{
    endpoints_sync.lock ();

545
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
546

547
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
548 549 550 551 552 553
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
    endpoints_sync.unlock ();
}

Pieter Hintjens's avatar
Pieter Hintjens committed
554
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
555
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
556 557
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
558
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
559

560 561 562 563 564 565 566 567
    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

568 569 570 571 572 573 574
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

575 576 577 578 579 580 581 582 583 584 585
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
586 587 588 589 590 591 592 593 594 595

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
596 597 598 599 600 601 602 603 604 605

    if (pending_connection_.endpoint.options.recv_identity) {
        msg_t id;
        int rc = id.init_size (bind_options.identity_size);
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
        bool written = pending_connection_.bind_pipe->write (&id);
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
606 607 608
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
    vmci_sync.lock ();

    if (vmci_fd == -1)  {
        vmci_family = VMCISock_GetAFValueFd (&vmci_fd);

        if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
            int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
            errno_assert (rc != -1);
#endif
        }
    }

    vmci_sync.unlock ();

    return vmci_family;
}

#endif

633 634 635 636
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;