ctx.cpp 20.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#ifndef ZMQ_HAVE_WINDOWS
33 34 35
#include <unistd.h>
#endif

36
#include <limits>
37
#include <climits>
38
#include <new>
f18m's avatar
f18m committed
39
#include <sstream>
40
#include <string.h>
41

42
#include "ctx.hpp"
43
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
44
#include "io_thread.hpp"
45
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "pipe.hpp"
47 48
#include "err.hpp"
#include "msg.hpp"
49
#include "random.hpp"
50

Ilya Kulakov's avatar
Ilya Kulakov committed
51 52 53 54
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

55
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
56
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
57

58
int clipped_maxsocket (int max_requested)
59
{
60 61
    if (max_requested >= zmq::poller_t::max_fds ()
        && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
62 63
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
64

65 66 67
    return max_requested;
}

68
zmq::ctx_t::ctx_t () :
69
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
70 71 72 73 74
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
75
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
76
    max_msgsz (INT_MAX),
Pieter Hintjens's avatar
Pieter Hintjens committed
77
    io_thread_count (ZMQ_IO_THREADS_DFLT),
78
    blocky (true),
79
    ipv6 (false)
Martin Sustrik's avatar
Martin Sustrik committed
80
{
81
#ifdef HAVE_FORK
82
    pid = getpid ();
83
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
84 85 86 87
#ifdef ZMQ_HAVE_VMCI
    vmci_fd = -1;
    vmci_family = -1;
#endif
88

89 90
    //  Initialise crypto library, if needed.
    zmq::random_open ();
91 92
}

93 94
bool zmq::ctx_t::check_tag ()
{
95
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
96 97
}

98
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
99
{
100
    //  Check that there are no remaining sockets.
101 102
    zmq_assert (sockets.empty ());

103 104
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
105
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
106
        io_threads[i]->stop ();
107
    }
Martin Sustrik's avatar
Martin Sustrik committed
108 109

    //  Wait till I/O threads actually terminate.
110
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
111
        LIBZMQ_DELETE (io_threads[i]);
112
    }
Martin Sustrik's avatar
Martin Sustrik committed
113

114
    //  Deallocate the reaper thread object.
115
    LIBZMQ_DELETE (reaper);
116

117 118
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
119
    //  corresponding io_thread/socket objects.
120
    free (slots);
121

122 123
    //  De-initialise crypto library, if needed.
    zmq::random_close ();
124

125
    //  Remove the tag, so that the object is considered dead.
126
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
127 128
}

129 130 131 132 133
bool zmq::ctx_t::valid () const
{
    return term_mailbox.valid ();
}

134
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
135
{
136
    slot_sync.lock ();
137

138 139
    bool saveTerminating = terminating;
    terminating = false;
140

141
    // Connect up any pending inproc connections, otherwise we will hang
142
    pending_connections_t copy = pending_connections;
143 144
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end ();
         ++p) {
145
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
146 147
        // create_socket might fail eg: out of memory/sockets limit reached
        zmq_assert (s);
148 149 150
        s->bind (p->first.c_str ());
        s->close ();
    }
151
    terminating = saveTerminating;
152

153
    if (!starting) {
154
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
155
        if (pid != getpid ()) {
156 157 158
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
159
                sockets[i]->get_mailbox ()->forked ();
160

Martin Hurton's avatar
Martin Hurton committed
161
            term_mailbox.forked ();
162 163
        }
#endif
164

165 166 167
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
168
        terminating = true;
169

170 171 172 173 174 175
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
176
                sockets[i]->stop ();
177 178 179
            if (sockets.empty ())
                reaper->stop ();
        }
180
        slot_sync.unlock ();
181 182 183 184 185 186

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
187
        errno_assert (rc == 0);
188 189 190 191
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
192
    slot_sync.unlock ();
193

Ilya Kulakov's avatar
Ilya Kulakov committed
194 195 196 197 198 199 200 201 202 203
#ifdef ZMQ_HAVE_VMCI
    vmci_sync.lock ();

    VMCISock_ReleaseAFValueFd (vmci_fd);
    vmci_family = -1;
    vmci_fd = -1;

    vmci_sync.unlock ();
#endif

204 205
    //  Deallocate the resources.
    delete this;
206

207 208
    return 0;
}
209

210 211
int zmq::ctx_t::shutdown ()
{
212
    scoped_lock_t locker (slot_sync);
213

214 215 216 217 218 219 220
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
221
            sockets[i]->stop ();
222 223 224 225 226 227 228
        if (sockets.empty ())
            reaper->stop ();
    }

    return 0;
}

229 230 231
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
232 233 234
    if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1
        && optval_ == clipped_maxsocket (optval_)) {
        scoped_lock_t locker (opt_sync);
235
        max_sockets = optval_;
236 237
    } else if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
238
        io_thread_count = optval_;
239 240
    } else if (option_ == ZMQ_IPV6 && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
241
        ipv6 = (optval_ != 0);
242 243
    } else if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
244
        blocky = (optval_ != 0);
245 246 247 248
    } else if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
        max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX;
    } else {
249
        rc = thread_ctx_t::set (option_, optval_);
250 251 252 253 254 255 256 257 258
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
259
    else if (option_ == ZMQ_SOCKET_LIMIT)
260
        rc = clipped_maxsocket (65535);
261
    else if (option_ == ZMQ_IO_THREADS)
262
        rc = io_thread_count;
263
    else if (option_ == ZMQ_IPV6)
Pieter Hintjens's avatar
Pieter Hintjens committed
264
        rc = ipv6;
265
    else if (option_ == ZMQ_BLOCKY)
266
        rc = blocky;
267
    else if (option_ == ZMQ_MAX_MSGSZ)
268
        rc = max_msgsz;
269
    else if (option_ == ZMQ_MSG_T_SIZE)
270
        rc = sizeof (zmq_msg_t);
271 272 273 274 275 276 277
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

278
bool zmq::ctx_t::start ()
279
{
280 281 282 283 284 285 286 287 288 289 290 291
    //  Initialise the array of mailboxes. Additional three slots are for
    //  zmq_ctx_term thread and reaper thread.
    opt_sync.lock ();
    int mazmq = max_sockets;
    int ios = io_thread_count;
    opt_sync.unlock ();
    slot_count = mazmq + ios + 2;
    slots = (i_mailbox **) malloc (sizeof (i_mailbox *) * slot_count);
    if (!slots) {
        errno = ENOMEM;
        goto fail;
    }
292

293 294
    //  Initialise the infrastructure for zmq_ctx_term thread.
    slots[term_tid] = &term_mailbox;
295

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
    //  Create the reaper thread.
    reaper = new (std::nothrow) reaper_t (this, reaper_tid);
    if (!reaper) {
        errno = ENOMEM;
        goto fail_cleanup_slots;
    }
    if (!reaper->get_mailbox ()->valid ())
        goto fail_cleanup_reaper;
    slots[reaper_tid] = reaper->get_mailbox ();
    reaper->start ();

    //  Create I/O thread objects and launch them.
    for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) 2; i--) {
        slots[i] = NULL;
    }
311

312 313 314 315 316 317 318 319 320
    for (int i = 2; i != ios + 2; i++) {
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
        if (!io_thread) {
            errno = ENOMEM;
            goto fail_cleanup_reaper;
        }
        if (!io_thread->get_mailbox ()->valid ()) {
            delete io_thread;
            goto fail_cleanup_reaper;
321
        }
322
        io_threads.push_back (io_thread);
323
        slots[i] = io_thread->get_mailbox ();
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
        io_thread->start ();
    }

    //  In the unused part of the slot array, create a list of empty slots.
    for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) ios + 2; i--) {
        empty_slots.push_back (i);
    }

    starting = false;
    return true;

fail_cleanup_reaper:
    reaper->stop ();
    delete reaper;
    reaper = NULL;

fail_cleanup_slots:
    free (slots);
    slots = NULL;

fail:
    return false;
}

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    scoped_lock_t locker (slot_sync);

    if (unlikely (starting)) {
        if (!start ())
            return NULL;
355 356
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
357
    //  Once zmq_ctx_term() was called, we can't create new sockets.
358 359 360 361
    if (terminating) {
        errno = ETERM;
        return NULL;
    }
362

363 364 365 366
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
367
    }
368

369 370 371
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
372

373 374 375
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

376
    //  Create the socket and register its mailbox.
377
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
378 379
    if (!s) {
        empty_slots.push_back (slot);
380
        return NULL;
381 382
    }
    sockets.push_back (s);
383
    slots[slot] = s->get_mailbox ();
384 385

    return s;
Martin Sustrik's avatar
Martin Sustrik committed
386 387
}

388
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
389
{
390
    scoped_lock_t locker (slot_sync);
391

Martin Hurton's avatar
Martin Hurton committed
392
    //  Free the associated thread slot.
393 394
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
395
    slots[tid] = NULL;
396

397 398 399
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
400
    //  If zmq_ctx_term() was already called and there are no more socket
401 402 403
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
404 405
}

406 407 408 409 410
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

411 412 413 414 415 416 417 418 419
zmq::thread_ctx_t::thread_ctx_t () :
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{
}

void zmq::thread_ctx_t::start_thread (thread_t &thread_,
                                      thread_fn *tfn_,
                                      void *arg_) const
420
{
f18m's avatar
f18m committed
421 422
    static unsigned int nthreads_started = 0;

423 424 425
    thread_.setSchedulingParameters (thread_priority, thread_sched_policy,
                                     thread_affinity_cpus);
    thread_.start (tfn_, arg_);
426
#ifndef ZMQ_HAVE_ANDROID
f18m's avatar
f18m committed
427
    std::ostringstream s;
428
    if (!thread_name_prefix.empty ())
429 430
        s << thread_name_prefix << "/";
    s << "ZMQbg/" << nthreads_started;
431
    thread_.setThreadName (s.str ().c_str ());
432
#endif
f18m's avatar
f18m committed
433
    nthreads_started++;
434 435
}

436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
int zmq::thread_ctx_t::set (int option_, int optval_)
{
    int rc = 0;
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
        thread_sched_policy = optval_;
    } else if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
        thread_affinity_cpus.insert (optval_);
    } else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
        std::set<int>::iterator it = thread_affinity_cpus.find (optval_);
        if (it != thread_affinity_cpus.end ()) {
            thread_affinity_cpus.erase (it);
        } else {
            errno = EINVAL;
            rc = -1;
        }
    } else if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
        std::ostringstream s;
        s << optval_;
        scoped_lock_t locker (opt_sync);
        thread_name_prefix = s.str ();
    } else if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
        scoped_lock_t locker (opt_sync);
        thread_priority = optval_;
    } else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

Martin Sustrik's avatar
Martin Sustrik committed
469
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
470
{
471
    slots[tid_]->send (command_);
472 473
}

474
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
475
{
476 477 478
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
479
    //  Find the I/O thread with minimum load.
480
    int min_load = -1;
481
    io_thread_t *selected_io_thread = NULL;
482 483
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
484
            int load = io_threads[i]->get_load ();
485
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
486
                min_load = load;
487
                selected_io_thread = io_threads[i];
Martin Sustrik's avatar
Martin Sustrik committed
488 489 490
            }
        }
    }
491
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
492
}
Martin Sustrik's avatar
Martin Sustrik committed
493

494
int zmq::ctx_t::register_endpoint (const char *addr_,
495
                                   const endpoint_t &endpoint_)
496
{
497
    scoped_lock_t locker (endpoints_sync);
498

499 500
    const bool inserted =
      endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (addr_, endpoint_).second;
501 502 503 504 505 506 507
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

508 509
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
                                     socket_base_t *socket_)
Martin Hurton's avatar
Martin Hurton committed
510
{
511
    scoped_lock_t locker (endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
512 513 514 515 516 517 518 519 520 521 522 523 524

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    return 0;
}

525
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
526
{
527
    scoped_lock_t locker (endpoints_sync);
528 529 530

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
531
        if (it->second.socket == socket_) {
532
            endpoints_t::iterator to_erase = it;
533
            ++it;
534 535 536
            endpoints.erase (to_erase);
            continue;
        }
537
        ++it;
538 539 540
    }
}

541
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
542
{
543
    scoped_lock_t locker (endpoints_sync);
544

545 546 547
    endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end ()) {
        errno = ECONNREFUSED;
548
        endpoint_t empty = {NULL, options_t ()};
549
        return empty;
550 551
    }
    endpoint_t endpoint = it->second;
552

553 554 555 556 557
    //  Increment the command sequence number of the peer so that it won't
    //  get deallocated until "bind" command is issued by the caller.
    //  The subsequent 'bind' has to be called with inc_seqnum parameter
    //  set to false, so that the seqnum isn't incremented twice.
    endpoint.socket->inc_seqnum ();
558

559
    return endpoint;
560
}
561

Martin Hurton's avatar
Martin Hurton committed
562
void zmq::ctx_t::pend_connection (const std::string &addr_,
563 564
                                  const endpoint_t &endpoint_,
                                  pipe_t **pipes_)
565
{
566
    scoped_lock_t locker (endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
567

568 569
    const pending_connection_t pending_connection = {endpoint_, pipes_[0],
                                                     pipes_[1]};
570

571
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
572
    if (it == endpoints.end ()) {
573
        //  Still no bind.
Martin Hurton's avatar
Martin Hurton committed
574
        endpoint_.socket->inc_seqnum ();
575 576
        pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
                                                       pending_connection);
577
    } else {
578
        //  Bind has happened in the mean time, connect directly
579 580
        connect_inproc_sockets (it->second.socket, it->second.options,
                                pending_connection, connect_side);
581
    }
582 583
}

584 585
void zmq::ctx_t::connect_pending (const char *addr_,
                                  zmq::socket_base_t *bind_socket_)
586
{
587
    scoped_lock_t locker (endpoints_sync);
588

589 590 591 592 593 594
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator>
      pending = pending_connections.equal_range (addr_);
    for (pending_connections_t::iterator p = pending.first; p != pending.second;
         ++p)
        connect_inproc_sockets (bind_socket_, endpoints[addr_].options,
                                p->second, bind_side);
595

596
    pending_connections.erase (pending.first, pending.second);
597 598
}

599 600 601 602 603
void zmq::ctx_t::connect_inproc_sockets (
  zmq::socket_base_t *bind_socket_,
  options_t &bind_options,
  const pending_connection_t &pending_connection_,
  side side_)
604
{
605
    bind_socket_->inc_seqnum ();
Martin Hurton's avatar
Martin Hurton committed
606
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
607

608
    if (!bind_options.recv_routing_id) {
609 610 611 612 613 614 615
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

616 617 618 619 620 621 622
    bool conflate =
      pending_connection_.endpoint.options.conflate
      && (pending_connection_.endpoint.options.type == ZMQ_DEALER
          || pending_connection_.endpoint.options.type == ZMQ_PULL
          || pending_connection_.endpoint.options.type == ZMQ_PUSH
          || pending_connection_.endpoint.options.type == ZMQ_PUB
          || pending_connection_.endpoint.options.type == ZMQ_SUB);
623

624
    if (!conflate) {
625 626 627 628 629 630 631 632 633 634 635 636 637 638
        pending_connection_.connect_pipe->set_hwms_boost (bind_options.sndhwm,
                                                          bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost (
          pending_connection_.endpoint.options.sndhwm,
          pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms (
          pending_connection_.endpoint.options.rcvhwm,
          pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms (bind_options.rcvhwm,
                                                 bind_options.sndhwm);
    } else {
        pending_connection_.connect_pipe->set_hwms (-1, -1);
        pending_connection_.bind_pipe->set_hwms (-1, -1);
639
    }
640 641 642 643 644 645

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
646 647 648 649 650
        bind_socket_->send_inproc_connected (
          pending_connection_.endpoint.socket);
    } else
        pending_connection_.connect_pipe->send_bind (
          bind_socket_, pending_connection_.bind_pipe, false);
651

652 653 654
    // When a ctx is terminated all pending inproc connection will be
    // connected, but the socket will already be closed and the pipe will be
    // in waiting_for_delimiter state, which means no more writes can be done
655
    // and the routing id write fails and causes an assert. Check if the socket
656
    // is open before sending.
657 658
    if (pending_connection_.endpoint.options.recv_routing_id
        && pending_connection_.endpoint.socket->check_tag ()) {
659 660
        msg_t routing_id;
        const int rc = routing_id.init_size (bind_options.routing_id_size);
661
        errno_assert (rc == 0);
662 663
        memcpy (routing_id.data (), bind_options.routing_id,
                bind_options.routing_id_size);
664 665
        routing_id.set_flags (msg_t::routing_id);
        const bool written = pending_connection_.bind_pipe->write (&routing_id);
666 667
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
668 669 670
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
671 672 673 674
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
675
    zmq::scoped_lock_t locker (vmci_sync);
Ilya Kulakov's avatar
Ilya Kulakov committed
676

677
    if (vmci_fd == -1) {
Ilya Kulakov's avatar
Ilya Kulakov committed
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
        vmci_family = VMCISock_GetAFValueFd (&vmci_fd);

        if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
            int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
            errno_assert (rc != -1);
#endif
        }
    }

    return vmci_family;
}

#endif

693 694 695 696
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;