ctx.cpp 18.5 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
33
#ifdef ZMQ_HAVE_WINDOWS
34 35 36 37 38
#include "windows.hpp"
#else
#include <unistd.h>
#endif

39
#include <limits>
40
#include <climits>
41
#include <new>
42
#include <string.h>
43

44
#include "ctx.hpp"
45
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "io_thread.hpp"
47
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
48
#include "pipe.hpp"
49 50
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
51

52 53
#if defined (ZMQ_USE_TWEETNACL)
#   include "tweetnacl.h"
54
#elif defined (ZMQ_USE_LIBSODIUM)
55
#   include "sodium.h"
56 57
#endif

Ilya Kulakov's avatar
Ilya Kulakov committed
58 59 60 61
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

62 63 64
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

65
int clipped_maxsocket (int max_requested)
66
{
Richard Newton's avatar
Richard Newton committed
67
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
68 69
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
70

71 72 73
    return max_requested;
}

74
zmq::ctx_t::ctx_t () :
75
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
76 77 78 79 80
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
81
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
82
    max_msgsz (INT_MAX),
Pieter Hintjens's avatar
Pieter Hintjens committed
83
    io_thread_count (ZMQ_IO_THREADS_DFLT),
84
    blocky (true),
85 86 87
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
88
{
89 90 91
#ifdef HAVE_FORK
    pid = getpid();
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
92 93 94 95
#ifdef ZMQ_HAVE_VMCI
    vmci_fd = -1;
    vmci_family = -1;
#endif
96 97 98 99 100 101

    crypto_sync.lock ();
#if defined (ZMQ_USE_TWEETNACL)
    // allow opening of /dev/urandom
    unsigned char tmpbytes[4];
    randombytes(tmpbytes, 4);
102
#elif defined (ZMQ_USE_SODIUM)
103 104 105 106
    int rc = sodium_init ();
    zmq_assert (rc != -1);
#endif
    crypto_sync.unlock ();
107 108
}

109 110
bool zmq::ctx_t::check_tag ()
{
111
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
112 113
}

114
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
115
{
116
    //  Check that there are no remaining sockets.
117 118
    zmq_assert (sockets.empty ());

119 120
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
121
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
Martin Sustrik's avatar
Martin Sustrik committed
122
        io_threads [i]->stop ();
123
    }
Martin Sustrik's avatar
Martin Sustrik committed
124 125

    //  Wait till I/O threads actually terminate.
126 127 128
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        LIBZMQ_DELETE(io_threads [i]);
    }
Martin Sustrik's avatar
Martin Sustrik committed
129

130
    //  Deallocate the reaper thread object.
131
    LIBZMQ_DELETE(reaper);
132

133 134
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
135
    //  corresponding io_thread/socket objects.
136
    free (slots);
137

138 139
    //  If we've done any Curve encryption, we may have a file handle
    //  to /dev/urandom open that needs to be cleaned up.
140 141
#ifdef ZMQ_HAVE_CURVE
    randombytes_close ();
142 143
#endif

144
    //  Remove the tag, so that the object is considered dead.
145
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
146 147
}

148
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
149
{
150
    slot_sync.lock();
151

152 153
    bool saveTerminating = terminating;
    terminating = false;
154

155
    // Connect up any pending inproc connections, otherwise we will hang
156 157 158 159 160 161
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
        s->bind (p->first.c_str ());
        s->close ();
    }
162
    terminating = saveTerminating;
163

164
    if (!starting) {
165

166
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
167
        if (pid != getpid ()) {
168 169 170
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
171
                sockets [i]->get_mailbox ()->forked ();
172

Martin Hurton's avatar
Martin Hurton committed
173
            term_mailbox.forked ();
174 175
        }
#endif
176

177 178 179
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
180
        terminating = true;
181

182 183 184 185 186 187 188 189 190 191
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
192
        slot_sync.unlock();
193 194 195 196 197 198

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
199
        errno_assert (rc == 0);
200 201 202 203
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
204
    slot_sync.unlock ();
205

Ilya Kulakov's avatar
Ilya Kulakov committed
206 207 208 209 210 211 212 213 214 215
#ifdef ZMQ_HAVE_VMCI
    vmci_sync.lock ();

    VMCISock_ReleaseAFValueFd (vmci_fd);
    vmci_family = -1;
    vmci_fd = -1;

    vmci_sync.unlock ();
#endif

216 217
    //  Deallocate the resources.
    delete this;
218

219 220
    return 0;
}
221

222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
int zmq::ctx_t::shutdown ()
{
    slot_sync.lock ();
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }
    slot_sync.unlock ();

    return 0;
}

241 242 243
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
244 245
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
246 247 248 249 250
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
251
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
252 253 254 255
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
256 257 258
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
        opt_sync.lock ();
259
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
260 261
        opt_sync.unlock ();
    }
262 263 264 265
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
        opt_sync.lock();
        thread_priority = optval_;
266
        opt_sync.unlock ();
267 268 269 270 271
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
        opt_sync.lock();
        thread_sched_policy = optval_;
272
        opt_sync.unlock ();
273
    }
274 275 276 277 278 279
    else
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
        opt_sync.lock ();
        blocky = (optval_ != 0);
        opt_sync.unlock ();
    }
280 281 282 283 284 285
    else
    if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
        opt_sync.lock ();
        max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
        opt_sync.unlock ();
    }
286 287 288 289 290 291 292 293 294 295 296 297 298
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
299
    if (option_ == ZMQ_SOCKET_LIMIT)
300
        rc = clipped_maxsocket (65535);
301
    else
302 303
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
304 305 306
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
307 308 309
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
310 311 312
    else
    if (option_ == ZMQ_MAX_MSGSZ)
        rc = max_msgsz;
313 314 315 316 317 318 319
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

320 321
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
322
    slot_sync.lock ();
323 324 325 326
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
327
        //  zmq_ctx_term thread and reaper thread.
328 329 330 331 332
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
333
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
334 335
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
336
        //  Initialise the infrastructure for zmq_ctx_term thread.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
362
    //  Once zmq_ctx_term() was called, we can't create new sockets.
363 364 365 366 367
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
368

369 370 371 372 373
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
374
    }
375

376 377 378
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
379

380 381 382
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

383
    //  Create the socket and register its mailbox.
384
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
385 386 387
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
388
        return NULL;
389 390
    }
    sockets.push_back (s);
391
    slots [slot] = s->get_mailbox ();
392

393
    slot_sync.unlock ();
394
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
395 396
}

397
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
398
{
399 400
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
401
    //  Free the associated thread slot.
402 403
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
404
    slots [tid] = NULL;
405

406 407 408
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
409
    //  If zmq_ctx_term() was already called and there are no more socket
410 411 412
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
413 414

    slot_sync.unlock ();
415 416
}

417 418 419 420 421
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

422 423 424 425 426 427
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}

Martin Sustrik's avatar
Martin Sustrik committed
428
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
429
{
Martin Sustrik's avatar
Martin Sustrik committed
430
    slots [tid_]->send (command_);
431 432
}

433
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
434
{
435 436 437
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
438
    //  Find the I/O thread with minimum load.
439
    int min_load = -1;
440
    io_thread_t *selected_io_thread = NULL;
441 442
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
443
            int load = io_threads [i]->get_load ();
444
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
445
                min_load = load;
446
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
447 448 449
            }
        }
    }
450
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
451
}
Martin Sustrik's avatar
Martin Sustrik committed
452

453 454
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
455 456 457
{
    endpoints_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
458 459
    const bool inserted = endpoints.insert (
        endpoints_t::value_type (std::string (addr_), endpoint_)).second;
460 461 462

    endpoints_sync.unlock ();

463 464 465 466 467 468 469
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    endpoints_sync.lock ();

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        endpoints_sync.unlock ();
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    endpoints_sync.unlock ();

    return 0;
}

490
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
491 492 493 494 495
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
496
        if (it->second.socket == socket_) {
497
            endpoints_t::iterator to_erase = it;
498
            ++it;
499 500 501
            endpoints.erase (to_erase);
            continue;
        }
502
        ++it;
503
    }
Martin Hurton's avatar
Martin Hurton committed
504

505 506 507
    endpoints_sync.unlock ();
}

508
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
509 510 511 512 513 514 515
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
516 517
         endpoint_t empty = {NULL, options_t()};
         return empty;
518
     }
519
     endpoint_t endpoint = it->second;
520 521 522

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
523 524
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
525
     endpoint.socket->inc_seqnum ();
526 527

     endpoints_sync.unlock ();
528
     return endpoint;
529
}
530

Martin Hurton's avatar
Martin Hurton committed
531 532
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
533
{
Martin Hurton's avatar
Martin Hurton committed
534 535 536
    const pending_connection_t pending_connection =
        {endpoint_, pipes_ [0], pipes_ [1]};

537 538
    endpoints_sync.lock ();

539
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
540
    if (it == endpoints.end ()) {
541
        // Still no bind.
Martin Hurton's avatar
Martin Hurton committed
542 543
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
544 545 546
    }
    else
        // Bind has happened in the mean time, connect directly
Martin Hurton's avatar
Martin Hurton committed
547
        connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
548 549 550 551

    endpoints_sync.unlock ();
}

552
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
553 554 555
{
    endpoints_sync.lock ();

556
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
557

558
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
559 560 561 562 563 564
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
    endpoints_sync.unlock ();
}

Pieter Hintjens's avatar
Pieter Hintjens committed
565
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
566
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
567 568
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
569
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
570

571 572 573 574 575 576 577 578
    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

579 580 581 582 583 584 585
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

586 587 588 589 590 591 592 593 594 595 596
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
597 598 599 600 601 602 603 604 605 606

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
607 608 609 610 611 612 613 614 615 616

    if (pending_connection_.endpoint.options.recv_identity) {
        msg_t id;
        int rc = id.init_size (bind_options.identity_size);
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
        bool written = pending_connection_.bind_pipe->write (&id);
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
617 618 619
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
    vmci_sync.lock ();

    if (vmci_fd == -1)  {
        vmci_family = VMCISock_GetAFValueFd (&vmci_fd);

        if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
            int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
            errno_assert (rc != -1);
#endif
        }
    }

    vmci_sync.unlock ();

    return vmci_family;
}

#endif

644 645 646 647
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;