socket_base.cpp 55.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <string>
33 34
#include <algorithm>

35
#include "macros.hpp"
36 37 38

#if defined ZMQ_HAVE_WINDOWS
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46
#else
#include <unistd.h>
47
#include <ctype.h>
48
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "udp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
70 71 72 73 74 75

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

76 77 78
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
79

80 81 82 83 84 85 86
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
87 88
#include "dealer.hpp"
#include "router.hpp"
89 90
#include "xpub.hpp"
#include "xsub.hpp"
91
#include "stream.hpp"
92
#include "server.hpp"
93
#include "client.hpp"
somdoron's avatar
somdoron committed
94 95
#include "radio.hpp"
#include "dish.hpp"
somdoron's avatar
somdoron committed
96 97
#include "gather.hpp"
#include "scatter.hpp"
98
#include "dgram.hpp"
99

100 101
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
                                             pipe_t *pipe_)
102
{
103
    _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
104 105
}

106 107
int zmq::socket_base_t::inprocs_t::erase_pipes (
  const std::string &endpoint_uri_str_)
108 109
{
    const std::pair<map_t::iterator, map_t::iterator> range =
110
      _inprocs.equal_range (endpoint_uri_str_);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    if (range.first == range.second) {
        errno = ENOENT;
        return -1;
    }

    for (map_t::iterator it = range.first; it != range.second; ++it)
        it->second->terminate (true);
    _inprocs.erase (range.first, range.second);
    return 0;
}

void zmq::socket_base_t::inprocs_t::erase_pipe (pipe_t *pipe_)
{
    for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
         it != end; ++it)
        if (it->second == pipe_) {
            _inprocs.erase (it);
            break;
        }
}

132
bool zmq::socket_base_t::check_tag () const
133
{
134
    return _tag == 0xbaddecaf;
135 136
}

137 138
bool zmq::socket_base_t::is_thread_safe () const
{
139
    return _thread_safe;
140 141
}

142 143 144 145
zmq::socket_base_t *zmq::socket_base_t::create (int type_,
                                                class ctx_t *parent_,
                                                uint32_t tid_,
                                                int sid_)
146 147 148
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
185 186 187
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
188 189 190
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
191 192 193 194 195 196
        case ZMQ_RADIO:
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
197 198 199 200 201 202
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
203 204 205
        case ZMQ_DGRAM:
            s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
206 207 208
        default:
            errno = EINVAL;
            return NULL;
209
    }
210 211

    alloc_assert (s);
somdoron's avatar
somdoron committed
212

213 214
    if (s->_mailbox == NULL) {
        s->_destroyed = true;
215
        LIBZMQ_DELETE (s);
Pieter Hintjens's avatar
Pieter Hintjens committed
216
        return NULL;
217
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
218

219 220 221
    return s;
}

222 223 224 225
zmq::socket_base_t::socket_base_t (ctx_t *parent_,
                                   uint32_t tid_,
                                   int sid_,
                                   bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
226
    own_t (parent_, tid_),
227 228 229 230 231 232 233 234 235 236 237 238 239 240
    _tag (0xbaddecaf),
    _ctx_terminated (false),
    _destroyed (false),
    _poller (NULL),
    _handle (static_cast<poller_t::handle_t> (NULL)),
    _last_tsc (0),
    _ticks (0),
    _rcvmore (false),
    _monitor_socket (NULL),
    _monitor_events (0),
    _thread_safe (thread_safe_),
    _reaper_signaler (NULL),
    _sync (),
    _monitor_sync ()
Martin Sustrik's avatar
Martin Sustrik committed
241
{
242
    options.socket_id = sid_;
243
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
244
    options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
245
    options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
somdoron's avatar
somdoron committed
246

247 248 249
    if (_thread_safe) {
        _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
        zmq_assert (_mailbox);
250 251
    } else {
        mailbox_t *m = new (std::nothrow) mailbox_t ();
252
        zmq_assert (m);
253

254
        if (m->get_fd () != retired_fd)
255
            _mailbox = m;
256 257
        else {
            LIBZMQ_DELETE (m);
258
            _mailbox = NULL;
259 260
        }
    }
261 262
}

263 264
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
                                        size_t routing_id_size_) const
265
{
266 267
    LIBZMQ_UNUSED (routing_id_);
    LIBZMQ_UNUSED (routing_id_size_);
268

269 270 271 272 273
    //  Only ROUTER sockets support this
    errno = ENOTSUP;
    return -1;
}

274 275
zmq::socket_base_t::~socket_base_t ()
{
276 277
    if (_mailbox)
        LIBZMQ_DELETE (_mailbox);
Ilya Kulakov's avatar
Ilya Kulakov committed
278

279 280
    if (_reaper_signaler)
        LIBZMQ_DELETE (_reaper_signaler);
Ilya Kulakov's avatar
Ilya Kulakov committed
281

282
    scoped_lock_t lock (_monitor_sync);
283
    stop_monitor ();
284

285
    zmq_assert (_destroyed);
286 287
}

288
zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
289
{
290
    return _mailbox;
291 292 293 294
}

void zmq::socket_base_t::stop ()
{
295 296
    //  Called by ctx when it is terminated (zmq_ctx_term).
    //  'stop' command is sent from the threads that called zmq_ctx_term to
297 298 299 300 301
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

302 303
// TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
// terminology, but this requires extensive changes to be consistent
304
int zmq::socket_base_t::parse_uri (const char *uri_,
305
                                   std::string &protocol_,
306
                                   std::string &path_)
307 308 309 310
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
311
    const std::string::size_type pos = uri.find ("://");
312 313 314 315 316
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
317
    path_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
318

319
    if (protocol_.empty () || path_.empty ()) {
320 321 322 323 324 325
        errno = EINVAL;
        return -1;
    }
    return 0;
}

326
int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
327
{
328
    //  First check out whether the protocol is something we are aware of.
329
    if (protocol_ != protocol_name::inproc
330 331
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
332
        && protocol_ != protocol_name::ipc
333
#endif
334
        && protocol_ != protocol_name::tcp
335
#if defined ZMQ_HAVE_OPENPGM
336 337 338
        //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
        && protocol_ != "pgm"
        && protocol_ != "epgm"
339 340
#endif
#if defined ZMQ_HAVE_TIPC
341
        // TIPC transport is only available on Linux.
342
        && protocol_ != protocol_name::tipc
343
#endif
344
#if defined ZMQ_HAVE_NORM
345
        && protocol_ != "norm"
346
#endif
347
#if defined ZMQ_HAVE_VMCI
348
        && protocol_ != protocol_name::vmci
349
#endif
350
        && protocol_ != protocol_name::udp) {
Ilya Kulakov's avatar
Ilya Kulakov committed
351 352 353 354
        errno = EPROTONOSUPPORT;
        return -1;
    }

355 356 357
        //  Check whether socket type and transport protocol match.
        //  Specifically, multicast protocols can't be combined with
        //  bi-directional messaging patterns (socket types).
358
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
359 360 361
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm")
        && options.type != ZMQ_PUB && options.type != ZMQ_SUB
        && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
362 363 364
        errno = ENOCOMPATPROTO;
        return -1;
    }
365
#endif
366

367
    if (protocol_ == protocol_name::udp
368 369
        && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
            && options.type != ZMQ_DGRAM)) {
370
        errno = ENOCOMPATPROTO;
371
        return -1;
372
    }
373

374 375 376 377
    //  Protocol is available.
    return 0;
}

378 379 380
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
                                      bool subscribe_to_all_,
                                      bool locally_initiated_)
381
{
382 383
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
384
    _pipes.push_back (pipe_);
385

386
    //  Let the derived socket type know about new pipe.
387
    xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
388 389 390 391 392

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
393
        pipe_->terminate (false);
394
    }
395 396
}

397 398 399
int zmq::socket_base_t::setsockopt (int option_,
                                    const void *optval_,
                                    size_t optvallen_)
400
{
401
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
402

403
    if (unlikely (_ctx_terminated)) {
404 405 406 407
        errno = ETERM;
        return -1;
    }

408 409
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
410
    if (rc == 0 || errno != EINVAL) {
411
        return rc;
somdoron's avatar
somdoron committed
412
    }
413 414 415

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
416
    rc = options.setsockopt (option_, optval_, optvallen_);
417
    update_pipe_options (option_);
somdoron's avatar
somdoron committed
418 419

    return rc;
420 421
}

422 423 424
int zmq::socket_base_t::getsockopt (int option_,
                                    void *optval_,
                                    size_t *optvallen_)
425
{
426
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
427

428
    if (unlikely (_ctx_terminated)) {
429 430 431 432
        errno = ETERM;
        return -1;
    }

433
    if (option_ == ZMQ_RCVMORE) {
434
        return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
435 436
    }

437
    if (option_ == ZMQ_FD) {
438
        if (_thread_safe) {
somdoron's avatar
somdoron committed
439 440 441 442
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            return -1;
        }
443

444
        return do_getsockopt<fd_t> (
445 446
          optval_, optvallen_,
          (static_cast<mailbox_t *> (_mailbox))->get_fd ());
447 448 449
    }

    if (option_ == ZMQ_EVENTS) {
450
        const int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
451
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
452
            return -1;
somdoron's avatar
somdoron committed
453
        }
454
        errno_assert (rc == 0);
455 456 457 458

        return do_getsockopt<int> (optval_, optvallen_,
                                   (has_out () ? ZMQ_POLLOUT : 0)
                                     | (has_in () ? ZMQ_POLLIN : 0));
459 460
    }

461
    if (option_ == ZMQ_LAST_ENDPOINT) {
462
        return do_getsockopt (optval_, optvallen_, _last_endpoint);
463 464
    }

465
    if (option_ == ZMQ_THREAD_SAFE) {
466
        return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
Ilya Kulakov's avatar
Ilya Kulakov committed
467
    }
468

469
    return options.getsockopt (option_, optval_, optvallen_);
470 471
}

472
int zmq::socket_base_t::join (const char *group_)
somdoron's avatar
somdoron committed
473
{
474
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
475

476
    return xjoin (group_);
somdoron's avatar
somdoron committed
477 478
}

479
int zmq::socket_base_t::leave (const char *group_)
somdoron's avatar
somdoron committed
480
{
481
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
482

483
    return xleave (group_);
somdoron's avatar
somdoron committed
484 485
}

486
void zmq::socket_base_t::add_signaler (signaler_t *s_)
487
{
488
    zmq_assert (_thread_safe);
489

490 491
    scoped_lock_t sync_lock (_sync);
    (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
492 493
}

494
void zmq::socket_base_t::remove_signaler (signaler_t *s_)
495
{
496
    zmq_assert (_thread_safe);
497

498 499
    scoped_lock_t sync_lock (_sync);
    (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
500 501
}

502
int zmq::socket_base_t::bind (const char *endpoint_uri_)
503
{
504
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
505

506
    if (unlikely (_ctx_terminated)) {
507 508 509 510
        errno = ETERM;
        return -1;
    }

511 512
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
513
    if (unlikely (rc != 0)) {
514
        return -1;
somdoron's avatar
somdoron committed
515
    }
516

517
    //  Parse endpoint_uri_ string.
518 519
    std::string protocol;
    std::string address;
520 521
    if (parse_uri (endpoint_uri_, protocol, address)
        || check_protocol (protocol)) {
522
        return -1;
somdoron's avatar
somdoron committed
523
    }
524

525
    if (protocol == protocol_name::inproc) {
526
        const endpoint_t endpoint = {this, options};
527
        rc = register_endpoint (endpoint_uri_, endpoint);
528
        if (rc == 0) {
529 530
            connect_pending (endpoint_uri_, this);
            _last_endpoint.assign (endpoint_uri_);
531
            options.connected = true;
532 533
        }
        return rc;
534
    }
535

536
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
537
        //  For convenience's sake, bind can be used interchangeable with
538
        //  connect for PGM, EPGM, NORM transports.
539
        rc = connect (endpoint_uri_);
540 541 542
        if (rc != -1)
            options.connected = true;
        return rc;
543 544
    }

545
    if (protocol == protocol_name::udp) {
546 547 548 549 550 551 552 553 554 555 556 557
        if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
            errno = ENOCOMPATPROTO;
            return -1;
        }

        //  Choose the I/O thread to run the session in.
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }

558 559
        address_t *paddr =
          new (std::nothrow) address_t (protocol, address, this->get_ctx ());
560 561 562 563
        alloc_assert (paddr);

        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
564 565
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
                                                options.ipv6);
566
        if (rc != 0) {
567
            LIBZMQ_DELETE (paddr);
568 569 570
            return -1;
        }

571 572
        session_base_t *session =
          session_base_t::create (io_thread, true, this, options, paddr);
573 574 575
        errno_assert (session);

        //  Create a bi-directional pipe.
576 577
        object_t *parents[2] = {this, session};
        pipe_t *new_pipes[2] = {NULL, NULL};
578

579 580
        int hwms[2] = {options.sndhwm, options.rcvhwm};
        bool conflates[2] = {false, false};
581 582 583 584
        rc = pipepair (parents, new_pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
585
        attach_pipe (new_pipes[0], true, true);
586
        pipe_t *const newpipe = new_pipes[0];
587 588

        //  Attach remote end of the pipe to the session object later on.
589
        session->attach_pipe (new_pipes[1]);
590 591

        //  Save last endpoint URI
592
        paddr->to_string (_last_endpoint);
593

594
        add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
595 596 597 598

        return 0;
    }

599
    //  Remaining transports require to be run in an I/O thread, so at this
600 601 602 603 604 605
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }
606

607
    if (protocol == protocol_name::tcp) {
608 609
        tcp_listener_t *listener =
          new (std::nothrow) tcp_listener_t (io_thread, this, options);
610
        alloc_assert (listener);
611
        rc = listener->set_address (address.c_str ());
612
        if (rc != 0) {
613 614
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
615
            return -1;
616
        }
617

618
        // Save last endpoint URI
619
        listener->get_address (_last_endpoint);
620

621 622
        add_endpoint (_last_endpoint.c_str (), static_cast<own_t *> (listener),
                      NULL);
623
        options.connected = true;
624 625 626
        return 0;
    }

627 628
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
629
    if (protocol == protocol_name::ipc) {
630 631
        ipc_listener_t *listener =
          new (std::nothrow) ipc_listener_t (io_thread, this, options);
632
        alloc_assert (listener);
633
        int rc = listener->set_address (address.c_str ());
634
        if (rc != 0) {
635 636
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
637 638
            return -1;
        }
639

640
        // Save last endpoint URI
641
        listener->get_address (_last_endpoint);
642

643 644
        add_endpoint (_last_endpoint.c_str (), static_cast<own_t *> (listener),
                      NULL);
645
        options.connected = true;
646
        return 0;
647
    }
648
#endif
649
#if defined ZMQ_HAVE_TIPC
650
    if (protocol == protocol_name::tipc) {
651 652 653 654 655 656 657 658 659
        tipc_listener_t *listener =
          new (std::nothrow) tipc_listener_t (io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
            return -1;
        }
660 661

        // Save last endpoint URI
662
        listener->get_address (_last_endpoint);
663

664
        add_endpoint (endpoint_uri_, static_cast<own_t *> (listener), NULL);
665
        options.connected = true;
666 667 668
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
669
#if defined ZMQ_HAVE_VMCI
670
    if (protocol == protocol_name::vmci) {
671 672
        vmci_listener_t *listener =
          new (std::nothrow) vmci_listener_t (io_thread, this, options);
Ilya Kulakov's avatar
Ilya Kulakov committed
673 674 675
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
676
            LIBZMQ_DELETE (listener);
Ilya Kulakov's avatar
Ilya Kulakov committed
677 678 679 680
            event_bind_failed (address, zmq_errno ());
            return -1;
        }

681
        listener->get_address (_last_endpoint);
Ilya Kulakov's avatar
Ilya Kulakov committed
682

683 684
        add_endpoint (_last_endpoint.c_str (), static_cast<own_t *> (listener),
                      NULL);
Ilya Kulakov's avatar
Ilya Kulakov committed
685 686 687 688
        options.connected = true;
        return 0;
    }
#endif
689

690
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
691
    return -1;
692 693
}

694
int zmq::socket_base_t::connect (const char *endpoint_uri_)
695
{
696
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
697

698
    if (unlikely (_ctx_terminated)) {
699 700 701 702
        errno = ETERM;
        return -1;
    }

703 704
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
705
    if (unlikely (rc != 0)) {
706
        return -1;
somdoron's avatar
somdoron committed
707
    }
708

709
    //  Parse endpoint_uri_ string.
710 711
    std::string protocol;
    std::string address;
712 713
    if (parse_uri (endpoint_uri_, protocol, address)
        || check_protocol (protocol)) {
714
        return -1;
somdoron's avatar
somdoron committed
715
    }
malosek's avatar
malosek committed
716

717
    if (protocol == protocol_name::inproc) {
718 719 720 721
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

722
        //  Find the peer endpoint.
723
        const endpoint_t peer = find_endpoint (endpoint_uri_);
724

725
        // The total HWM for an inproc connection should be the sum of
726
        // the binder's HWM and the connector's HWM.
727 728 729 730 731 732 733 734 735 736
        const int sndhwm = peer.socket == NULL
                             ? options.sndhwm
                             : options.sndhwm != 0 && peer.options.rcvhwm != 0
                                 ? options.sndhwm + peer.options.rcvhwm
                                 : 0;
        const int rcvhwm = peer.socket == NULL
                             ? options.rcvhwm
                             : options.rcvhwm != 0 && peer.options.sndhwm != 0
                                 ? options.rcvhwm + peer.options.sndhwm
                                 : 0;
737

738
        //  Create a bi-directional pipe to connect the peers.
739 740 741
        object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
        pipe_t *new_pipes[2] = {NULL, NULL};

742
        const bool conflate = get_effective_conflate_option (options);
743 744 745

        int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
        bool conflates[2] = {conflate, conflate};
746
        rc = pipepair (parents, new_pipes, hwms, conflates);
747
        if (!conflate) {
748 749 750
            new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
                                          peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
751 752
        }

753
        errno_assert (rc == 0);
754

755 756
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
757 758
            //  to send the routing id message or not. To resolve this,
            //  we always send our routing id and drop it later if
759
            //  the peer doesn't expect it.
760
            send_routing_id (new_pipes[0], options);
761

Martin Hurton's avatar
Martin Hurton committed
762
            const endpoint_t endpoint = {this, options};
763
            pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
764
        } else {
765 766
            //  If required, send the routing id of the local socket to the peer.
            if (peer.options.recv_routing_id) {
767
                send_routing_id (new_pipes[0], options);
768
            }
769

770 771
            //  If required, send the routing id of the peer to the local socket.
            if (options.recv_routing_id) {
772
                send_routing_id (new_pipes[1], peer.options);
773
            }
774

775 776 777
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
778
            send_bind (peer.socket, new_pipes[1], false);
779
        }
780

Martin Hurton's avatar
Martin Hurton committed
781
        //  Attach local end of the pipe to this socket object.
782
        attach_pipe (new_pipes[0], false, true);
Martin Hurton's avatar
Martin Hurton committed
783

784
        // Save last endpoint URI
785
        _last_endpoint.assign (endpoint_uri_);
786

787
        // remember inproc connections for disconnect
788
        _inprocs.emplace (endpoint_uri_, new_pipes[0]);
789

790
        options.connected = true;
791 792
        return 0;
    }
793
    const bool is_single_connect =
794 795
      (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
       || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
796
    if (unlikely (is_single_connect)) {
797
        if (0 != _endpoints.count (endpoint_uri_)) {
798 799 800 801 802 803
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
            return 0;
        }
    }
804

805 806 807 808 809 810 811
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

812 813
    address_t *paddr =
      new (std::nothrow) address_t (protocol, address, this->get_ctx ());
814
    alloc_assert (paddr);
815 816

    //  Resolve address (if needed by the protocol)
817
    if (protocol == protocol_name::tcp) {
818 819 820
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
821 822
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
823 824 825 826 827 828
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
829 830
        if (isalnum (*check) || isxdigit (*check) || *check == '['
            || *check == ':') {
831
            check++;
832 833 834 835
            while (isalnum (*check) || isxdigit (*check) || *check == '.'
                   || *check == '-' || *check == ':' || *check == '%'
                   || *check == ';' || *check == '[' || *check == ']'
                   || *check == '_' || *check == '*') {
836
                check++;
837
            }
838 839 840 841 842 843 844 845 846 847
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
848
                    rc = 0; //  Valid
849 850 851
            }
        }
        if (rc == -1) {
852
            errno = EINVAL;
853
            LIBZMQ_DELETE (paddr);
854 855 856
            return -1;
        }
        //  Defer resolution until a socket is opened
857
        paddr->resolved.tcp_addr = NULL;
858
    }
859 860
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
861
    else if (protocol == protocol_name::ipc) {
862
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
863
        alloc_assert (paddr->resolved.ipc_addr);
864 865
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
866
            LIBZMQ_DELETE (paddr);
867 868 869
            return -1;
        }
    }
870
#endif
871

872
    if (protocol == protocol_name::udp) {
873 874
        if (options.type != ZMQ_RADIO) {
            errno = ENOCOMPATPROTO;
875
            LIBZMQ_DELETE (paddr);
876 877
            return -1;
        }
878

879 880
        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
881 882
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
                                                options.ipv6);
883
        if (rc != 0) {
884
            LIBZMQ_DELETE (paddr);
885 886
            return -1;
        }
887 888
    }

889
        // TBD - Should we check address for ZMQ_HAVE_NORM???
890

891 892 893 894
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
895 896
        int rc =
          pgm_socket_t::init_address (address.c_str (), &res, &port_number);
897 898
        if (res != NULL)
            pgm_freeaddrinfo (res);
899
        if (rc != 0 || port_number == 0) {
900
            return -1;
901
        }
902
    }
903
#endif
904
#if defined ZMQ_HAVE_TIPC
905
    else if (protocol == protocol_name::tipc) {
906 907
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
908
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
909
        if (rc != 0) {
910
            LIBZMQ_DELETE (paddr);
911 912
            return -1;
        }
913 914 915 916 917 918 919 920 921
        sockaddr_tipc *saddr =
          (sockaddr_tipc *) paddr->resolved.tipc_addr->addr ();
        // Cannot connect to random Port Identity
        if (saddr->addrtype == TIPC_ADDR_ID
            && paddr->resolved.tipc_addr->is_random ()) {
            LIBZMQ_DELETE (paddr);
            errno = EINVAL;
            return -1;
        }
922 923
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
924
#if defined ZMQ_HAVE_VMCI
925
    else if (protocol == protocol_name::vmci) {
926 927
        paddr->resolved.vmci_addr =
          new (std::nothrow) vmci_address_t (this->get_ctx ());
Ilya Kulakov's avatar
Ilya Kulakov committed
928 929 930
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
931
            LIBZMQ_DELETE (paddr);
Ilya Kulakov's avatar
Ilya Kulakov committed
932 933 934 935
            return -1;
        }
    }
#endif
936

937
    //  Create session.
938 939
    session_base_t *session =
      session_base_t::create (io_thread, true, this, options, paddr);
940
    errno_assert (session);
941

942
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
943
    //  sent to this pipe. (same for NORM, currently?)
944 945 946
    const bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
                                  || protocol == "norm"
                                  || protocol == protocol_name::udp;
947
    pipe_t *newpipe = NULL;
948

949
    if (options.immediate != 1 || subscribe_to_all) {
950
        //  Create a bi-directional pipe.
951 952 953
        object_t *parents[2] = {this, session};
        pipe_t *new_pipes[2] = {NULL, NULL};

954
        const bool conflate = get_effective_conflate_option (options);
955 956 957 958

        int hwms[2] = {conflate ? -1 : options.sndhwm,
                       conflate ? -1 : options.rcvhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
959
        rc = pipepair (parents, new_pipes, hwms, conflates);
960
        errno_assert (rc == 0);
961

962
        //  Attach local end of the pipe to the socket object.
963
        attach_pipe (new_pipes[0], subscribe_to_all, true);
964
        newpipe = new_pipes[0];
Martin Sustrik's avatar
Martin Sustrik committed
965

966
        //  Attach remote end of the pipe to the session object later on.
967
        session->attach_pipe (new_pipes[1]);
968 969 970
    }

    //  Save last endpoint URI
971
    paddr->to_string (_last_endpoint);
972

973
    add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
974 975 976
    return 0;
}

977
std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_,
978 979 980 981 982 983 984
                                                  const char *tcp_address_)
{
    // The resolved last_endpoint is used as a key in the endpoints map.
    // The address passed by the user might not match in the TCP case due to
    // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
    // resolve before giving up. Given at this stage we don't know whether a
    // socket is connected or bound, try with both.
985
    if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
986 987 988 989 990
        tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
        alloc_assert (tcp_addr);
        int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);

        if (rc == 0) {
991 992
            tcp_addr->to_string (endpoint_uri_);
            if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
993 994
                rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
                if (rc == 0) {
995
                    tcp_addr->to_string (endpoint_uri_);
996 997 998 999 1000
                }
            }
        }
        LIBZMQ_DELETE (tcp_addr);
    }
1001
    return endpoint_uri_;
1002 1003
}

1004
void zmq::socket_base_t::add_endpoint (const char *endpoint_uri_,
1005
                                       own_t *endpoint_,
1006
                                       pipe_t *pipe_)
1007
{
1008
    //  Activate the session. Make it a child of this socket.
1009
    launch_child (endpoint_);
1010
    _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_),
1011
                                          endpoint_pipe_t (endpoint_, pipe_));
1012 1013
}

1014
int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1015
{
1016
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
1017

1018
    //  Check whether the library haven't been shut down yet.
1019
    if (unlikely (_ctx_terminated)) {
1020 1021 1022
        errno = ETERM;
        return -1;
    }
malosek's avatar
malosek committed
1023

1024
    //  Check whether endpoint address passed to the function is valid.
1025
    if (unlikely (!endpoint_uri_)) {
1026 1027 1028 1029
        errno = EINVAL;
        return -1;
    }

1030 1031
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
1032
    const int rc = process_commands (0, false);
1033
    if (unlikely (rc != 0)) {
1034
        return -1;
somdoron's avatar
somdoron committed
1035
    }
1036

1037 1038 1039 1040 1041
    //  Parse endpoint_uri_ string.
    std::string uri_protocol;
    std::string uri_path;
    if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
        || check_protocol (uri_protocol)) {
1042
        return -1;
somdoron's avatar
somdoron committed
1043
    }
1044

1045
    const std::string endpoint_uri_str = std::string (endpoint_uri_);
1046

1047
    // Disconnect an inproc socket
1048
    if (uri_protocol == protocol_name::inproc) {
1049
        return unregister_endpoint (endpoint_uri_str, this) == 0
1050
                 ? 0
1051
                 : _inprocs.erase_pipes (endpoint_uri_str);
1052 1053
    }

1054 1055 1056 1057
    const std::string resolved_endpoint_uri =
      uri_protocol == protocol_name::tcp
        ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
        : endpoint_uri_str;
1058

1059
    //  Find the endpoints range (if any) corresponding to the endpoint_uri_ string.
1060
    const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1061
      _endpoints.equal_range (resolved_endpoint_uri);
1062 1063
    if (range.first == range.second) {
        errno = ENOENT;
1064
        return -1;
1065
    }
1066

1067 1068 1069
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1070
            it->second.second->terminate (false);
1071
        term_child (it->second.first);
1072
    }
1073
    _endpoints.erase (range.first, range.second);
1074
    return 0;
1075 1076
}

1077
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1078
{
1079
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
1080

1081
    //  Check whether the library haven't been shut down yet.
1082
    if (unlikely (_ctx_terminated)) {
1083 1084 1085 1086
        errno = ETERM;
        return -1;
    }

1087
    //  Check whether message passed to the function is valid.
1088
    if (unlikely (!msg_ || !msg_->check ())) {
1089 1090 1091 1092
        errno = EFAULT;
        return -1;
    }

1093
    //  Process pending commands, if any.
1094
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1095
    if (unlikely (rc != 0)) {
1096
        return -1;
somdoron's avatar
somdoron committed
1097
    }
1098

1099 1100 1101
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1102
    //  At this point we impose the flags on the message.
1103
    if (flags_ & ZMQ_SNDMORE)
1104
        msg_->set_flags (msg_t::more);
1105

1106 1107
    msg_->reset_metadata ();

1108
    //  Try to send the message using method in each socket class
1109
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1110
    if (rc == 0) {
1111
        return 0;
somdoron's avatar
somdoron committed
1112
    }
1113
    if (unlikely (errno != EAGAIN)) {
1114
        return -1;
somdoron's avatar
somdoron committed
1115
    }
Martin Sustrik's avatar
Martin Sustrik committed
1116

1117
    //  In case of non-blocking send we'll simply propagate
1118
    //  the error - including EAGAIN - up the stack.
1119
    if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
Martin Sustrik's avatar
Martin Sustrik committed
1120
        return -1;
somdoron's avatar
somdoron committed
1121
    }
Martin Sustrik's avatar
Martin Sustrik committed
1122

1123
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1124
    //  If the timeout is infinite, don't care.
1125
    int timeout = options.sndtimeo;
1126
    const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1127

1128 1129
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1130 1131
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1132
        if (unlikely (process_commands (timeout, false) != 0)) {
1133
            return -1;
1134
        }
1135
        rc = xsend (msg_);
1136 1137
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1138
        if (unlikely (errno != EAGAIN)) {
1139
            return -1;
somdoron's avatar
somdoron committed
1140
        }
1141
        if (timeout > 0) {
1142
            timeout = static_cast<int> (end - _clock.now_ms ());
1143 1144 1145 1146 1147
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1148
    }
somdoron's avatar
somdoron committed
1149

Martin Sustrik's avatar
Martin Sustrik committed
1150
    return 0;
1151 1152
}

1153
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1154
{
1155
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
1156

1157
    //  Check whether the library haven't been shut down yet.
1158
    if (unlikely (_ctx_terminated)) {
1159 1160 1161 1162
        errno = ETERM;
        return -1;
    }

1163
    //  Check whether message passed to the function is valid.
1164
    if (unlikely (!msg_ || !msg_->check ())) {
1165 1166 1167 1168
        errno = EFAULT;
        return -1;
    }

1169 1170 1171 1172 1173 1174 1175
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1176
    //  ticks is more efficient than doing RDTSC all the time.
1177
    if (++_ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1178
        if (unlikely (process_commands (0, false) != 0)) {
1179
            return -1;
somdoron's avatar
somdoron committed
1180
        }
1181
        _ticks = 0;
1182 1183
    }

Martin Hurton's avatar
Martin Hurton committed
1184
    //  Get the message.
1185
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1186
    if (unlikely (rc != 0 && errno != EAGAIN)) {
Martin Hurton's avatar
Martin Hurton committed
1187
        return -1;
somdoron's avatar
somdoron committed
1188
    }
Martin Hurton's avatar
Martin Hurton committed
1189

1190
    //  If we have the message, return immediately.
1191
    if (rc == 0) {
1192
        extract_flags (msg_);
1193
        return 0;
1194
    }
1195

Martin Sustrik's avatar
Martin Sustrik committed
1196
    //  If the message cannot be fetched immediately, there are two scenarios.
1197
    //  For non-blocking recv, commands are processed in case there's an
Patrik Wenger's avatar
Patrik Wenger committed
1198
    //  activate_reader command already waiting in a command pipe.
1199
    //  If it's not, return EAGAIN.
1200
    if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1201
        if (unlikely (process_commands (0, false) != 0)) {
1202
            return -1;
somdoron's avatar
somdoron committed
1203
        }
1204
        _ticks = 0;
1205

1206
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1207
        if (rc < 0) {
1208
            return rc;
somdoron's avatar
somdoron committed
1209
        }
1210
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1211

1212
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1213 1214
    }

1215
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1216
    //  If the timeout is infinite, don't care.
1217
    int timeout = options.rcvtimeo;
1218
    const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1219

1220 1221
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1222
    bool block = (_ticks != 0);
1223
    while (true) {
somdoron's avatar
somdoron committed
1224
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1225
            return -1;
somdoron's avatar
somdoron committed
1226
        }
1227
        rc = xrecv (msg_);
1228
        if (rc == 0) {
1229
            _ticks = 0;
1230 1231
            break;
        }
somdoron's avatar
somdoron committed
1232
        if (unlikely (errno != EAGAIN)) {
1233
            return -1;
somdoron's avatar
somdoron committed
1234
        }
1235
        block = true;
1236
        if (timeout > 0) {
1237
            timeout = static_cast<int> (end - _clock.now_ms ());
1238 1239 1240 1241 1242
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1243
    }
1244

1245
    extract_flags (msg_);
1246
    return 0;
1247 1248 1249 1250
}

int zmq::socket_base_t::close ()
{
1251
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1252

1253
    //  Remove all existing signalers for thread safe sockets
1254 1255
    if (_thread_safe)
        (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1256

1257
    //  Mark the socket as dead
1258
    _tag = 0xdeadbeef;
1259

1260

1261 1262 1263 1264
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1265

1266 1267 1268
    return 0;
}

1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1279
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1280
{
1281
    //  Plug the socket to the reaper thread.
1282
    _poller = poller_;
somdoron's avatar
somdoron committed
1283 1284 1285

    fd_t fd;

1286 1287
    if (!_thread_safe)
        fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1288
    else {
1289
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
somdoron's avatar
somdoron committed
1290

1291 1292
        _reaper_signaler = new (std::nothrow) signaler_t ();
        zmq_assert (_reaper_signaler);
1293

somdoron's avatar
somdoron committed
1294
        //  Add signaler to the safe mailbox
1295 1296 1297
        fd = _reaper_signaler->get_fd ();
        (static_cast<mailbox_safe_t *> (_mailbox))
          ->add_signaler (_reaper_signaler);
somdoron's avatar
somdoron committed
1298 1299

        //  Send a signal to make sure reaper handle existing commands
1300
        _reaper_signaler->send ();
somdoron's avatar
somdoron committed
1301 1302
    }

1303 1304
    _handle = _poller->add_fd (fd, this);
    _poller->set_pollin (_handle);
1305 1306 1307 1308 1309

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1310 1311
}

1312
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1313
{
1314
    if (timeout_ == 0) {
1315 1316 1317
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1318
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1319
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1320

1321 1322 1323 1324 1325 1326
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1327
        if (tsc && throttle_) {
Martin Sustrik's avatar
Martin Sustrik committed
1328 1329 1330
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
1331
            if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1332
                return 0;
1333
            _last_tsc = tsc;
1334
        }
1335
    }
Martin Sustrik's avatar
Martin Sustrik committed
1336

1337 1338 1339 1340
    //  Check whether there are any commands pending for this thread.
    command_t cmd;
    int rc = _mailbox->recv (&cmd, timeout_);

1341 1342
    //  Process all available commands.
    while (rc == 0) {
1343
        cmd.destination->process_command (cmd);
1344
        rc = _mailbox->recv (&cmd, 0);
1345 1346 1347 1348 1349 1350
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1351

1352
    if (_ctx_terminated) {
1353 1354
        errno = ETERM;
        return -1;
1355
    }
1356 1357

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1358 1359
}

1360
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1361
{
1362
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1363
    //  We'll remember the fact so that any blocking call is interrupted and any
1364 1365
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1366
    scoped_lock_t lock (_monitor_sync);
1367
    stop_monitor ();
1368

1369
    _ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1370 1371
}

1372
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1373
{
1374
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1375 1376
}

1377
void zmq::socket_base_t::process_term (int linger_)
1378
{
1379 1380 1381 1382 1383
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1384
    //  Ask all attached pipes to terminate.
1385 1386 1387
    for (pipes_t::size_type i = 0; i != _pipes.size (); ++i)
        _pipes[i]->terminate (false);
    register_term_acks (static_cast<int> (_pipes.size ()));
1388

1389
    //  Continue the termination process immediately.
1390
    own_t::process_term (linger_);
1391 1392
}

1393 1394
void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
{
1395
    term_endpoint (endpoint_->c_str ());
1396 1397 1398
    delete endpoint_;
}

1399
void zmq::socket_base_t::update_pipe_options (int option_)
1400
{
1401
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1402 1403 1404
        for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
            _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
            _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1405 1406
        }
    }
1407 1408
}

1409 1410
void zmq::socket_base_t::process_destroy ()
{
1411
    _destroyed = true;
1412 1413
}

1414
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1415 1416 1417 1418
{
    errno = EINVAL;
    return -1;
}
1419 1420 1421 1422 1423 1424

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1425
int zmq::socket_base_t::xsend (msg_t *)
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

somdoron's avatar
somdoron committed
1436 1437
int zmq::socket_base_t::xjoin (const char *group_)
{
1438
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1439 1440 1441 1442 1443 1444
    errno = ENOTSUP;
    return -1;
}

int zmq::socket_base_t::xleave (const char *group_)
{
1445
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1446 1447 1448 1449
    errno = ENOTSUP;
    return -1;
}

1450
int zmq::socket_base_t::xrecv (msg_t *)
1451 1452 1453 1454 1455
{
    errno = ENOTSUP;
    return -1;
}

1456
void zmq::socket_base_t::xread_activated (pipe_t *)
1457 1458 1459
{
    zmq_assert (false);
}
1460
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1461 1462 1463 1464
{
    zmq_assert (false);
}

1465
void zmq::socket_base_t::xhiccuped (pipe_t *)
1466
{
1467
    zmq_assert (false);
1468 1469
}

1470 1471
void zmq::socket_base_t::in_event ()
{
1472 1473 1474 1475
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1476
    {
1477
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1478

1479
        //  If the socket is thread safe we need to unsignal the reaper signaler
1480 1481
        if (_thread_safe)
            _reaper_signaler->recv ();
1482

1483 1484 1485
        process_commands (0, false);
    }
    check_destroy ();
1486 1487 1488 1489 1490 1491 1492
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1493
void zmq::socket_base_t::timer_event (int)
1494 1495 1496
{
    zmq_assert (false);
}
1497

1498 1499
void zmq::socket_base_t::check_destroy ()
{
1500
    //  If the object was already marked as destroyed, finish the deallocation.
1501
    if (_destroyed) {
1502
        //  Remove the socket from the reaper's poller.
1503
        _poller->rm_fd (_handle);
1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1526 1527
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1528
    if (options.immediate == 1)
1529 1530 1531 1532
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1533 1534
}

1535
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1536 1537
{
    //  Notify the specific socket type about the pipe termination.
1538
    xpipe_terminated (pipe_);
1539

1540
    // Remove pipe from inproc pipes
1541
    _inprocs.erase_pipe (pipe_);
1542

1543 1544
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
1545
    _pipes.erase (pipe_);
1546 1547 1548 1549
    if (is_terminating ())
        unregister_term_ack ();
}

1550 1551
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1552 1553 1554
    //  Test whether routing_id flag is valid for this socket type.
    if (unlikely (msg_->flags () & msg_t::routing_id))
        zmq_assert (options.recv_routing_id);
Martin Hurton's avatar
Martin Hurton committed
1555

1556
    //  Remove MORE flag.
1557
    _rcvmore = (msg_->flags () & msg_t::more) != 0;
1558
}
1559

1560
int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
1561
{
1562
    scoped_lock_t lock (_monitor_sync);
1563

1564
    if (unlikely (_ctx_terminated)) {
1565 1566 1567
        errno = ETERM;
        return -1;
    }
1568

1569
    //  Support deregistering monitoring endpoints as well
1570
    if (endpoint_ == NULL) {
1571 1572 1573
        stop_monitor ();
        return 0;
    }
1574
    //  Parse endpoint_uri_ string.
1575 1576
    std::string protocol;
    std::string address;
1577
    if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1578 1579
        return -1;

1580
    //  Event notification only supported over inproc://
1581
    if (protocol != protocol_name::inproc) {
1582 1583 1584
        errno = EPROTONOSUPPORT;
        return -1;
    }
1585
    // already monitoring. Stop previous monitor before starting new one.
1586
    if (_monitor_socket != NULL) {
1587 1588
        stop_monitor (true);
    }
1589
    //  Register events to monitor
1590 1591 1592
    _monitor_events = events_;
    _monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
    if (_monitor_socket == NULL)
1593 1594
        return -1;

1595
    //  Never block context termination on pending event messages
1596
    int linger = 0;
1597
    int rc =
1598
      zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1599
    if (rc == -1)
1600
        stop_monitor (false);
1601

1602
    //  Spawn the monitor socket endpoint
1603
    rc = zmq_bind (_monitor_socket, endpoint_);
1604
    if (rc == -1)
1605
        stop_monitor (false);
1606 1607 1608
    return rc;
}

1609
void zmq::socket_base_t::event_connected (const std::string &endpoint_uri_,
1610
                                          zmq::fd_t fd_)
1611
{
1612
    event (endpoint_uri_, fd_, ZMQ_EVENT_CONNECTED);
1613
}
1614

1615 1616
void zmq::socket_base_t::event_connect_delayed (
  const std::string &endpoint_uri_, int err_)
1617
{
1618
    event (endpoint_uri_, err_, ZMQ_EVENT_CONNECT_DELAYED);
1619
}
1620

1621 1622
void zmq::socket_base_t::event_connect_retried (
  const std::string &endpoint_uri_, int interval_)
1623
{
1624
    event (endpoint_uri_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
1625 1626
}

1627
void zmq::socket_base_t::event_listening (const std::string &endpoint_uri_,
1628
                                          zmq::fd_t fd_)
1629
{
1630
    event (endpoint_uri_, fd_, ZMQ_EVENT_LISTENING);
1631 1632
}

1633 1634
void zmq::socket_base_t::event_bind_failed (const std::string &endpoint_uri_,
                                            int err_)
1635
{
1636
    event (endpoint_uri_, err_, ZMQ_EVENT_BIND_FAILED);
1637 1638
}

1639
void zmq::socket_base_t::event_accepted (const std::string &endpoint_uri_,
1640
                                         zmq::fd_t fd_)
1641
{
1642
    event (endpoint_uri_, fd_, ZMQ_EVENT_ACCEPTED);
1643 1644
}

1645
void zmq::socket_base_t::event_accept_failed (const std::string &endpoint_uri_,
1646
                                              int err_)
1647
{
1648
    event (endpoint_uri_, err_, ZMQ_EVENT_ACCEPT_FAILED);
1649 1650
}

1651 1652
void zmq::socket_base_t::event_closed (const std::string &endpoint_uri_,
                                       zmq::fd_t fd_)
1653
{
1654
    event (endpoint_uri_, fd_, ZMQ_EVENT_CLOSED);
1655
}
Martin Hurton's avatar
Martin Hurton committed
1656

1657 1658
void zmq::socket_base_t::event_close_failed (const std::string &endpoint_uri_,
                                             int err_)
1659
{
1660
    event (endpoint_uri_, err_, ZMQ_EVENT_CLOSE_FAILED);
1661 1662
}

1663
void zmq::socket_base_t::event_disconnected (const std::string &endpoint_uri_,
1664
                                             zmq::fd_t fd_)
1665
{
1666
    event (endpoint_uri_, fd_, ZMQ_EVENT_DISCONNECTED);
1667 1668
}

1669
void zmq::socket_base_t::event_handshake_failed_no_detail (
1670
  const std::string &endpoint_uri_, int err_)
Vincent Tellier's avatar
Vincent Tellier committed
1671
{
1672
    event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
Vincent Tellier's avatar
Vincent Tellier committed
1673 1674
}

1675
void zmq::socket_base_t::event_handshake_failed_protocol (
1676
  const std::string &endpoint_uri_, int err_)
1677
{
1678
    event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1679 1680
}

1681 1682
void zmq::socket_base_t::event_handshake_failed_auth (
  const std::string &endpoint_uri_, int err_)
1683
{
1684
    event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1685 1686
}

1687 1688
void zmq::socket_base_t::event_handshake_succeeded (
  const std::string &endpoint_uri_, int err_)
1689
{
1690
    event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
Vincent Tellier's avatar
Vincent Tellier committed
1691 1692
}

1693
void zmq::socket_base_t::event (const std::string &endpoint_uri_,
1694 1695
                                intptr_t value_,
                                int type_)
1696
{
1697 1698
    scoped_lock_t lock (_monitor_sync);
    if (_monitor_events & type_) {
1699
        monitor_event (type_, value_, endpoint_uri_);
1700
    }
1701 1702
}

1703
//  Send a monitor event
1704 1705
void zmq::socket_base_t::monitor_event (int event_,
                                        intptr_t value_,
1706
                                        const std::string &endpoint_uri_) const
1707
{
1708 1709 1710
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1711
    if (_monitor_socket) {
1712
        //  Send event in first frame
1713 1714
        const uint16_t event = static_cast<uint16_t> (event_);
        const uint32_t value = static_cast<uint32_t> (value_);
1715
        zmq_msg_t msg;
1716
        zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1717
        uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1718
        //  Avoid dereferencing uint32_t on unaligned address
1719
        memcpy (data + 0, &event, sizeof (event));
1720
        memcpy (data + sizeof (event), &value, sizeof (value));
1721
        zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE);
1722 1723

        //  Send address in second frame
1724 1725 1726
        zmq_msg_init_size (&msg, endpoint_uri_.size ());
        memcpy (zmq_msg_data (&msg), endpoint_uri_.c_str (),
                endpoint_uri_.size ());
1727
        zmq_sendmsg (_monitor_socket, &msg, 0);
1728
    }
1729 1730
}

1731
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1732
{
1733 1734 1735
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1736 1737
    if (_monitor_socket) {
        if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
1738
            && send_monitor_stopped_event_)
1739
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1740 1741 1742
        zmq_close (_monitor_socket);
        _monitor_socket = NULL;
        _monitor_events = 0;
1743
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1744
}
1745 1746 1747 1748 1749 1750 1751 1752

zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
                                                   uint32_t tid_,
                                                   int sid_) :
    socket_base_t (parent_, tid_, sid_)
{
}

1753 1754 1755 1756 1757
zmq::routing_socket_base_t::~routing_socket_base_t ()
{
    zmq_assert (_out_pipes.empty ());
}

1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776
int zmq::routing_socket_base_t::xsetsockopt (int option_,
                                             const void *optval_,
                                             size_t optvallen_)
{
    switch (option_) {
        case ZMQ_CONNECT_ROUTING_ID:
            // TODO why isn't it possible to set an empty connect_routing_id
            //   (which is the default value)
            if (optval_ && optvallen_) {
                _connect_routing_id.assign (static_cast<const char *> (optval_),
                                            optvallen_);
                return 0;
            }
            break;
    }
    errno = EINVAL;
    return -1;
}

1777 1778
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
{
1779
    const out_pipes_t::iterator end = _out_pipes.end ();
1780
    out_pipes_t::iterator it;
1781
    for (it = _out_pipes.begin (); it != end; ++it)
1782 1783 1784
        if (it->second.pipe == pipe_)
            break;

1785
    zmq_assert (it != end);
1786 1787 1788 1789
    zmq_assert (!it->second.active);
    it->second.active = true;
}

1790 1791 1792 1793 1794 1795
std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
{
    std::string res = ZMQ_MOVE (_connect_routing_id);
    _connect_routing_id.clear ();
    return res;
}
1796

1797
bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
1798 1799 1800 1801
{
    return !_connect_routing_id.empty ();
}

1802 1803
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
                                               pipe_t *pipe_)
1804 1805 1806 1807
{
    //  Add the record into output pipes lookup table
    const out_pipe_t outpipe = {pipe_, true};
    const bool ok =
1808
      _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
1809 1810 1811 1812
        .second;
    zmq_assert (ok);
}

1813
bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
1814
{
1815
    return 0 != _out_pipes.count (routing_id_);
1816 1817 1818
}

zmq::routing_socket_base_t::out_pipe_t *
1819
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
1820 1821
{
    // TODO we could probably avoid constructor a temporary blob_t to call this function
1822
    out_pipes_t::iterator it = _out_pipes.find (routing_id_);
1823 1824 1825 1826
    return it == _out_pipes.end () ? NULL : &it->second;
}

const zmq::routing_socket_base_t::out_pipe_t *
1827
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
1828 1829
{
    // TODO we could probably avoid constructor a temporary blob_t to call this function
1830
    const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
1831 1832 1833
    return it == _out_pipes.end () ? NULL : &it->second;
}

1834 1835
void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
{
1836 1837 1838 1839 1840
    const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
    zmq_assert (erased);
}

zmq::routing_socket_base_t::out_pipe_t
1841
zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
1842
{
1843
    const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
1844 1845 1846 1847 1848 1849
    out_pipe_t res = {NULL, false};
    if (it != _out_pipes.end ()) {
        res = it->second;
        _out_pipes.erase (it);
    }
    return res;
1850
}