socket_base.cpp 52.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <string>
33 34
#include <algorithm>

35
#include "macros.hpp"
36 37 38

#if defined ZMQ_HAVE_WINDOWS
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46
#else
#include <unistd.h>
47
#include <ctype.h>
48
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "udp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
70 71 72 73 74 75

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

76 77 78
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
79

80 81 82 83 84 85 86
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
87 88
#include "dealer.hpp"
#include "router.hpp"
89 90
#include "xpub.hpp"
#include "xsub.hpp"
91
#include "stream.hpp"
92
#include "server.hpp"
93
#include "client.hpp"
somdoron's avatar
somdoron committed
94 95
#include "radio.hpp"
#include "dish.hpp"
somdoron's avatar
somdoron committed
96 97
#include "gather.hpp"
#include "scatter.hpp"
98
#include "dgram.hpp"
99

100 101 102 103 104
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

105 106 107 108 109
bool zmq::socket_base_t::is_thread_safe () const
{
    return thread_safe;
}

110 111 112 113
zmq::socket_base_t *zmq::socket_base_t::create (int type_,
                                                class ctx_t *parent_,
                                                uint32_t tid_,
                                                int sid_)
114 115 116
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
153 154 155
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
156 157 158
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
159 160 161 162 163 164
        case ZMQ_RADIO:
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
165 166 167 168 169 170
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
171 172 173
        case ZMQ_DGRAM:
            s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
174 175 176
        default:
            errno = EINVAL;
            return NULL;
177
    }
178 179

    alloc_assert (s);
somdoron's avatar
somdoron committed
180

181
    if (s->mailbox == NULL) {
182
        s->destroyed = true;
183
        LIBZMQ_DELETE (s);
Pieter Hintjens's avatar
Pieter Hintjens committed
184
        return NULL;
185
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
186

187 188 189
    return s;
}

190 191 192 193
zmq::socket_base_t::socket_base_t (ctx_t *parent_,
                                   uint32_t tid_,
                                   int sid_,
                                   bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
194
    own_t (parent_, tid_),
195
    tag (0xbaddecaf),
196
    ctx_terminated (false),
197
    destroyed (false),
198
    poller (NULL),
199
    handle (static_cast<poller_t::handle_t> (NULL)),
Martin Sustrik's avatar
Martin Sustrik committed
200
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
201
    ticks (0),
202 203
    rcvmore (false),
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
204
    monitor_events (0),
205
    thread_safe (thread_safe_),
206
    reaper_signaler (NULL),
207 208
    sync (),
    monitor_sync ()
Martin Sustrik's avatar
Martin Sustrik committed
209
{
210
    options.socket_id = sid_;
211
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
212
    options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
213
    options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
somdoron's avatar
somdoron committed
214

215 216
    if (thread_safe) {
        mailbox = new (std::nothrow) mailbox_safe_t (&sync);
217
        zmq_assert (mailbox);
218 219
    } else {
        mailbox_t *m = new (std::nothrow) mailbox_t ();
220
        zmq_assert (m);
221

222 223 224 225 226 227 228
        if (m->get_fd () != retired_fd)
            mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            mailbox = NULL;
        }
    }
229 230
}

231 232
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
                                        size_t routing_id_size_) const
233
{
234 235
    LIBZMQ_UNUSED (routing_id_);
    LIBZMQ_UNUSED (routing_id_size_);
236

237 238 239 240 241
    //  Only ROUTER sockets support this
    errno = ENOTSUP;
    return -1;
}

242 243
zmq::socket_base_t::~socket_base_t ()
{
244
    if (mailbox)
245
        LIBZMQ_DELETE (mailbox);
Ilya Kulakov's avatar
Ilya Kulakov committed
246

247
    if (reaper_signaler)
248
        LIBZMQ_DELETE (reaper_signaler);
Ilya Kulakov's avatar
Ilya Kulakov committed
249

250
    scoped_lock_t lock (monitor_sync);
251
    stop_monitor ();
252

253
    zmq_assert (destroyed);
254 255
}

somdoron's avatar
somdoron committed
256
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
257
{
somdoron's avatar
somdoron committed
258
    return mailbox;
259 260 261 262
}

void zmq::socket_base_t::stop ()
{
263 264
    //  Called by ctx when it is terminated (zmq_ctx_term).
    //  'stop' command is sent from the threads that called zmq_ctx_term to
265 266 267 268 269
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

270
int zmq::socket_base_t::parse_uri (const char *uri_,
271 272
                                   std::string &protocol_,
                                   std::string &address_)
273 274 275 276 277 278 279 280 281 282 283
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
284

285 286 287 288 289 290 291
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

292 293
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
294
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
295
    if (protocol_ != "inproc"
296 297
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
298
        && protocol_ != "ipc"
299
#endif
300
        && protocol_ != "tcp"
301
#if defined ZMQ_HAVE_OPENPGM
302 303 304
        //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
        && protocol_ != "pgm"
        && protocol_ != "epgm"
305 306
#endif
#if defined ZMQ_HAVE_TIPC
307 308
        // TIPC transport is only available on Linux.
        && protocol_ != "tipc"
309
#endif
310
#if defined ZMQ_HAVE_NORM
311
        && protocol_ != "norm"
312
#endif
313
#if defined ZMQ_HAVE_VMCI
314
        && protocol_ != "vmci"
315
#endif
316
        && protocol_ != "udp") {
Ilya Kulakov's avatar
Ilya Kulakov committed
317 318 319 320
        errno = EPROTONOSUPPORT;
        return -1;
    }

321 322 323
        //  Check whether socket type and transport protocol match.
        //  Specifically, multicast protocols can't be combined with
        //  bi-directional messaging patterns (socket types).
324
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
325 326 327
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm")
        && options.type != ZMQ_PUB && options.type != ZMQ_SUB
        && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
328 329 330
        errno = ENOCOMPATPROTO;
        return -1;
    }
331
#endif
332

333 334 335
    if (protocol_ == "udp"
        && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
            && options.type != ZMQ_DGRAM)) {
336
        errno = ENOCOMPATPROTO;
337
        return -1;
338
    }
339

340 341 342 343
    //  Protocol is available.
    return 0;
}

344
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
345
{
346 347 348
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
349

350
    //  Let the derived socket type know about new pipe.
351
    xattach_pipe (pipe_, subscribe_to_all_);
352 353 354 355 356

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
357
        pipe_->terminate (false);
358
    }
359 360
}

361 362 363
int zmq::socket_base_t::setsockopt (int option_,
                                    const void *optval_,
                                    size_t optvallen_)
364
{
365
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
366

367
    if (!options.is_valid (option_)) {
368 369 370
        errno = EINVAL;
        return -1;
    }
371

372
    if (unlikely (ctx_terminated)) {
373 374 375 376
        errno = ETERM;
        return -1;
    }

377 378
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
379
    if (rc == 0 || errno != EINVAL) {
380
        return rc;
somdoron's avatar
somdoron committed
381
    }
382 383 384

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
385
    rc = options.setsockopt (option_, optval_, optvallen_);
386
    update_pipe_options (option_);
somdoron's avatar
somdoron committed
387 388

    return rc;
389 390
}

391 392 393
int zmq::socket_base_t::getsockopt (int option_,
                                    void *optval_,
                                    size_t *optvallen_)
394
{
395
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
396

397
    if (unlikely (ctx_terminated)) {
398 399 400 401
        errno = ETERM;
        return -1;
    }

402
    if (option_ == ZMQ_RCVMORE) {
403
        return do_getsockopt<int> (optval_, optvallen_, rcvmore ? 1 : 0);
404 405
    }

406
    if (option_ == ZMQ_FD) {
somdoron's avatar
somdoron committed
407 408 409 410 411
        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            return -1;
        }
412

413 414
        return do_getsockopt<fd_t> (
          optval_, optvallen_, (static_cast<mailbox_t *> (mailbox))->get_fd ());
415 416 417
    }

    if (option_ == ZMQ_EVENTS) {
418
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
419
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
420
            return -1;
somdoron's avatar
somdoron committed
421
        }
422
        errno_assert (rc == 0);
423 424 425 426

        return do_getsockopt<int> (optval_, optvallen_,
                                   (has_out () ? ZMQ_POLLOUT : 0)
                                     | (has_in () ? ZMQ_POLLIN : 0));
427 428
    }

429
    if (option_ == ZMQ_LAST_ENDPOINT) {
430
        return do_getsockopt (optval_, optvallen_, last_endpoint);
431 432
    }

433
    if (option_ == ZMQ_THREAD_SAFE) {
434
        return do_getsockopt<int> (optval_, optvallen_, thread_safe ? 1 : 0);
Ilya Kulakov's avatar
Ilya Kulakov committed
435
    }
436

437
    return options.getsockopt (option_, optval_, optvallen_);
438 439
}

440
int zmq::socket_base_t::join (const char *group_)
somdoron's avatar
somdoron committed
441
{
442
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
443 444 445 446 447 448 449

    int rc = xjoin (group_);


    return rc;
}

450
int zmq::socket_base_t::leave (const char *group_)
somdoron's avatar
somdoron committed
451
{
452
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
453 454 455 456 457 458 459

    int rc = xleave (group_);


    return rc;
}

460
void zmq::socket_base_t::add_signaler (signaler_t *s_)
461
{
462
    zmq_assert (thread_safe);
463

464
    scoped_lock_t sync_lock (sync);
465
    (static_cast<mailbox_safe_t *> (mailbox))->add_signaler (s_);
466 467
}

468
void zmq::socket_base_t::remove_signaler (signaler_t *s_)
469
{
470
    zmq_assert (thread_safe);
471

472
    scoped_lock_t sync_lock (sync);
473
    (static_cast<mailbox_safe_t *> (mailbox))->remove_signaler (s_);
474 475
}

476 477
int zmq::socket_base_t::bind (const char *addr_)
{
478
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
479

480
    if (unlikely (ctx_terminated)) {
481 482 483 484
        errno = ETERM;
        return -1;
    }

485 486
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
487
    if (unlikely (rc != 0)) {
488
        return -1;
somdoron's avatar
somdoron committed
489
    }
490

491
    //  Parse addr_ string.
492 493
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
494
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
495
        return -1;
somdoron's avatar
somdoron committed
496
    }
497

Pieter Hintjens's avatar
Pieter Hintjens committed
498
    if (protocol == "inproc") {
499
        const endpoint_t endpoint = {this, options};
500
        rc = register_endpoint (addr_, endpoint);
501
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
502
            connect_pending (addr_, this);
503
            last_endpoint.assign (addr_);
504
            options.connected = true;
505 506
        }
        return rc;
507
    }
508

509
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
510
        //  For convenience's sake, bind can be used interchangeable with
511
        //  connect for PGM, EPGM, NORM transports.
512 513 514 515
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
516 517
    }

518 519 520 521 522 523 524 525 526 527 528 529 530
    if (protocol == "udp") {
        if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
            errno = ENOCOMPATPROTO;
            return -1;
        }

        //  Choose the I/O thread to run the session in.
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }

531 532
        address_t *paddr =
          new (std::nothrow) address_t (protocol, address, this->get_ctx ());
533 534 535 536
        alloc_assert (paddr);

        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
537 538
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
                                                options.ipv6);
539
        if (rc != 0) {
540
            LIBZMQ_DELETE (paddr);
541 542 543
            return -1;
        }

544 545
        session_base_t *session =
          session_base_t::create (io_thread, true, this, options, paddr);
546 547 548 549 550
        errno_assert (session);

        pipe_t *newpipe = NULL;

        //  Create a bi-directional pipe.
551 552
        object_t *parents[2] = {this, session};
        pipe_t *new_pipes[2] = {NULL, NULL};
553

554 555
        int hwms[2] = {options.sndhwm, options.rcvhwm};
        bool conflates[2] = {false, false};
556 557 558 559
        rc = pipepair (parents, new_pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
560 561
        attach_pipe (new_pipes[0], true);
        newpipe = new_pipes[0];
562 563

        //  Attach remote end of the pipe to the session object later on.
564
        session->attach_pipe (new_pipes[1]);
565 566 567 568 569 570 571 572 573

        //  Save last endpoint URI
        paddr->to_string (last_endpoint);

        add_endpoint (addr_, (own_t *) session, newpipe);

        return 0;
    }

574
    //  Remaining transports require to be run in an I/O thread, so at this
575 576 577 578 579 580
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }
581

582
    if (protocol == "tcp") {
583 584
        tcp_listener_t *listener =
          new (std::nothrow) tcp_listener_t (io_thread, this, options);
585
        alloc_assert (listener);
586
        rc = listener->set_address (address.c_str ());
587
        if (rc != 0) {
588 589
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
590
            return -1;
591
        }
592

593
        // Save last endpoint URI
594
        listener->get_address (last_endpoint);
595

596
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
597
        options.connected = true;
598 599 600
        return 0;
    }

601 602
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
603
    if (protocol == "ipc") {
604 605
        ipc_listener_t *listener =
          new (std::nothrow) ipc_listener_t (io_thread, this, options);
606
        alloc_assert (listener);
607
        int rc = listener->set_address (address.c_str ());
608
        if (rc != 0) {
609 610
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
611 612
            return -1;
        }
613

614
        // Save last endpoint URI
615
        listener->get_address (last_endpoint);
616

617
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
618
        options.connected = true;
619
        return 0;
620
    }
621
#endif
622
#if defined ZMQ_HAVE_TIPC
623
    if (protocol == "tipc") {
624 625 626 627 628 629 630 631 632
        tipc_listener_t *listener =
          new (std::nothrow) tipc_listener_t (io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE (listener);
            event_bind_failed (address, zmq_errno ());
            return -1;
        }
633 634 635 636 637

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
638
        options.connected = true;
639 640 641
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
642 643
#if defined ZMQ_HAVE_VMCI
    if (protocol == "vmci") {
644 645
        vmci_listener_t *listener =
          new (std::nothrow) vmci_listener_t (io_thread, this, options);
Ilya Kulakov's avatar
Ilya Kulakov committed
646 647 648
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
649
            LIBZMQ_DELETE (listener);
Ilya Kulakov's avatar
Ilya Kulakov committed
650 651 652 653 654 655
            event_bind_failed (address, zmq_errno ());
            return -1;
        }

        listener->get_address (last_endpoint);

656
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
Ilya Kulakov's avatar
Ilya Kulakov committed
657 658 659 660
        options.connected = true;
        return 0;
    }
#endif
661

662
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
663
    return -1;
664 665
}

666
int zmq::socket_base_t::connect (const char *addr_)
667
{
668
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
669

670
    if (unlikely (ctx_terminated)) {
671 672 673 674
        errno = ETERM;
        return -1;
    }

675 676
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
677
    if (unlikely (rc != 0)) {
678
        return -1;
somdoron's avatar
somdoron committed
679
    }
680

malosek's avatar
malosek committed
681
    //  Parse addr_ string.
682 683
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
684
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
685
        return -1;
somdoron's avatar
somdoron committed
686
    }
malosek's avatar
malosek committed
687

Pieter Hintjens's avatar
Pieter Hintjens committed
688
    if (protocol == "inproc") {
689 690 691 692
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

693 694
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
695

696
        // The total HWM for an inproc connection should be the sum of
697
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
698
        int sndhwm = 0;
699 700 701
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
702
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
703
        int rcvhwm = 0;
704 705
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
706
        else if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
707
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
708

709
        //  Create a bi-directional pipe to connect the peers.
710 711 712 713 714 715 716 717 718 719 720
        object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
        pipe_t *new_pipes[2] = {NULL, NULL};

        bool conflate =
          options.conflate
          && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL
              || options.type == ZMQ_PUSH || options.type == ZMQ_PUB
              || options.type == ZMQ_SUB);

        int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
        bool conflates[2] = {conflate, conflate};
721
        rc = pipepair (parents, new_pipes, hwms, conflates);
722
        if (!conflate) {
723 724 725
            new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
                                          peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
726 727
        }

728
        errno_assert (rc == 0);
729

730 731
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
732 733
            //  to send the routing id message or not. To resolve this,
            //  we always send our routing id and drop it later if
734 735
            //  the peer doesn't expect it.
            msg_t id;
736
            rc = id.init_size (options.routing_id_size);
737
            errno_assert (rc == 0);
738 739
            memcpy (id.data (), options.routing_id, options.routing_id_size);
            id.set_flags (msg_t::routing_id);
740
            bool written = new_pipes[0]->write (&id);
741
            zmq_assert (written);
742
            new_pipes[0]->flush ();
743

Martin Hurton's avatar
Martin Hurton committed
744 745
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
746
        } else {
747 748
            //  If required, send the routing id of the local socket to the peer.
            if (peer.options.recv_routing_id) {
749
                msg_t id;
750
                rc = id.init_size (options.routing_id_size);
751
                errno_assert (rc == 0);
752 753
                memcpy (id.data (), options.routing_id,
                        options.routing_id_size);
754
                id.set_flags (msg_t::routing_id);
755
                bool written = new_pipes[0]->write (&id);
756
                zmq_assert (written);
757
                new_pipes[0]->flush ();
758
            }
759

760 761
            //  If required, send the routing id of the peer to the local socket.
            if (options.recv_routing_id) {
762
                msg_t id;
763
                rc = id.init_size (peer.options.routing_id_size);
764
                errno_assert (rc == 0);
765 766
                memcpy (id.data (), peer.options.routing_id,
                        peer.options.routing_id_size);
767
                id.set_flags (msg_t::routing_id);
768
                bool written = new_pipes[1]->write (&id);
769
                zmq_assert (written);
770
                new_pipes[1]->flush ();
771
            }
772

773 774 775
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
776
            send_bind (peer.socket, new_pipes[1], false);
777
        }
778

Martin Hurton's avatar
Martin Hurton committed
779
        //  Attach local end of the pipe to this socket object.
780
        attach_pipe (new_pipes[0]);
Martin Hurton's avatar
Martin Hurton committed
781

782
        // Save last endpoint URI
783
        last_endpoint.assign (addr_);
784

785
        // remember inproc connections for disconnect
786
        inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), new_pipes[0]);
787

788
        options.connected = true;
789 790
        return 0;
    }
791 792 793
    bool is_single_connect =
      (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
       || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
794
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
795
        const endpoints_t::iterator it = endpoints.find (addr_);
796 797 798 799 800 801 802
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
            return 0;
        }
    }
803

804 805 806 807 808 809 810
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

811 812
    address_t *paddr =
      new (std::nothrow) address_t (protocol, address, this->get_ctx ());
813
    alloc_assert (paddr);
814 815 816

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
817 818 819
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
820 821
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
822 823 824 825 826 827
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
828 829
        if (isalnum (*check) || isxdigit (*check) || *check == '['
            || *check == ':') {
830
            check++;
831 832 833 834
            while (isalnum (*check) || isxdigit (*check) || *check == '.'
                   || *check == '-' || *check == ':' || *check == '%'
                   || *check == ';' || *check == '[' || *check == ']'
                   || *check == '_' || *check == '*') {
835
                check++;
836
            }
837 838 839 840 841 842 843 844 845 846
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
847
                    rc = 0; //  Valid
848 849 850
            }
        }
        if (rc == -1) {
851
            errno = EINVAL;
852
            LIBZMQ_DELETE (paddr);
853 854 855
            return -1;
        }
        //  Defer resolution until a socket is opened
856
        paddr->resolved.tcp_addr = NULL;
857
    }
858 859
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
860
    else if (protocol == "ipc") {
861
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
862
        alloc_assert (paddr->resolved.ipc_addr);
863 864
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
865
            LIBZMQ_DELETE (paddr);
866 867 868
            return -1;
        }
    }
869
#endif
870

871
    if (protocol == "udp") {
872 873
        if (options.type != ZMQ_RADIO) {
            errno = ENOCOMPATPROTO;
874
            LIBZMQ_DELETE (paddr);
875 876
            return -1;
        }
877

878 879
        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
880 881
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
                                                options.ipv6);
882
        if (rc != 0) {
883
            LIBZMQ_DELETE (paddr);
884 885
            return -1;
        }
886 887
    }

888
        // TBD - Should we check address for ZMQ_HAVE_NORM???
889

890 891 892 893
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
894 895
        int rc =
          pgm_socket_t::init_address (address.c_str (), &res, &port_number);
896 897
        if (res != NULL)
            pgm_freeaddrinfo (res);
898
        if (rc != 0 || port_number == 0) {
899
            return -1;
900
        }
901
    }
902
#endif
903
#if defined ZMQ_HAVE_TIPC
904
    else if (protocol == "tipc") {
905 906
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
907
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
908
        if (rc != 0) {
909
            LIBZMQ_DELETE (paddr);
910 911
            return -1;
        }
912 913 914 915 916 917 918 919 920
        sockaddr_tipc *saddr =
          (sockaddr_tipc *) paddr->resolved.tipc_addr->addr ();
        // Cannot connect to random Port Identity
        if (saddr->addrtype == TIPC_ADDR_ID
            && paddr->resolved.tipc_addr->is_random ()) {
            LIBZMQ_DELETE (paddr);
            errno = EINVAL;
            return -1;
        }
921 922
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
923
#if defined ZMQ_HAVE_VMCI
924 925 926
    else if (protocol == "vmci") {
        paddr->resolved.vmci_addr =
          new (std::nothrow) vmci_address_t (this->get_ctx ());
Ilya Kulakov's avatar
Ilya Kulakov committed
927 928 929
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
930
            LIBZMQ_DELETE (paddr);
Ilya Kulakov's avatar
Ilya Kulakov committed
931 932 933 934
            return -1;
        }
    }
#endif
935

936
    //  Create session.
937 938
    session_base_t *session =
      session_base_t::create (io_thread, true, this, options, paddr);
939
    errno_assert (session);
940

941
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
942
    //  sent to this pipe. (same for NORM, currently?)
943 944
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
                            || protocol == "norm" || protocol == "udp";
945
    pipe_t *newpipe = NULL;
946

947
    if (options.immediate != 1 || subscribe_to_all) {
948
        //  Create a bi-directional pipe.
949 950 951 952 953 954 955 956 957 958 959 960
        object_t *parents[2] = {this, session};
        pipe_t *new_pipes[2] = {NULL, NULL};

        bool conflate =
          options.conflate
          && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL
              || options.type == ZMQ_PUSH || options.type == ZMQ_PUB
              || options.type == ZMQ_SUB);

        int hwms[2] = {conflate ? -1 : options.sndhwm,
                       conflate ? -1 : options.rcvhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
961
        rc = pipepair (parents, new_pipes, hwms, conflates);
962
        errno_assert (rc == 0);
963

964
        //  Attach local end of the pipe to the socket object.
965 966
        attach_pipe (new_pipes[0], subscribe_to_all);
        newpipe = new_pipes[0];
Martin Sustrik's avatar
Martin Sustrik committed
967

968
        //  Attach remote end of the pipe to the session object later on.
969
        session->attach_pipe (new_pipes[1]);
970 971 972
    }

    //  Save last endpoint URI
973
    paddr->to_string (last_endpoint);
974

975
    add_endpoint (addr_, (own_t *) session, newpipe);
976 977 978
    return 0;
}

979 980 981
void zmq::socket_base_t::add_endpoint (const char *addr_,
                                       own_t *endpoint_,
                                       pipe_t *pipe)
982
{
983
    //  Activate the session. Make it a child of this socket.
984
    launch_child (endpoint_);
985
    endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_),
986
                                         endpoint_pipe_t (endpoint_, pipe));
987 988 989 990
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
991
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
992

993 994 995 996 997
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
malosek's avatar
malosek committed
998

999
    //  Check whether endpoint address passed to the function is valid.
1000 1001 1002 1003 1004
    if (unlikely (!addr_)) {
        errno = EINVAL;
        return -1;
    }

1005 1006 1007
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
1008
    if (unlikely (rc != 0)) {
1009
        return -1;
somdoron's avatar
somdoron committed
1010
    }
1011

1012 1013 1014
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
1015
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
1016
        return -1;
somdoron's avatar
somdoron committed
1017
    }
1018

1019 1020
    const std::string addr_str = std::string (addr_);

1021 1022
    // Disconnect an inproc socket
    if (protocol == "inproc") {
1023
        if (unregister_endpoint (addr_str, this) == 0) {
Martin Hurton's avatar
Martin Hurton committed
1024
            return 0;
somdoron's avatar
somdoron committed
1025
        }
1026
        std::pair<inprocs_t::iterator, inprocs_t::iterator> range =
1027
          inprocs.equal_range (addr_str);
1028 1029 1030 1031
        if (range.first == range.second) {
            errno = ENOENT;
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
1032

1033
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
1034
            it->second->terminate (true);
1035 1036 1037 1038
        inprocs.erase (range.first, range.second);
        return 0;
    }

1039
    std::string resolved_addr = addr_;
1040 1041 1042 1043 1044 1045 1046

    // The resolved last_endpoint is used as a key in the endpoints map.
    // The address passed by the user might not match in the TCP case due to
    // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
    // resolve before giving up. Given at this stage we don't know whether a
    // socket is connected or bound, try with both.
    if (protocol == "tcp") {
1047
        if (endpoints.find (resolved_addr) == endpoints.end ()) {
1048 1049 1050 1051 1052 1053
            tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
            alloc_assert (tcp_addr);
            rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);

            if (rc == 0) {
                tcp_addr->to_string (resolved_addr);
1054
                if (endpoints.find (resolved_addr) == endpoints.end ()) {
1055 1056
                    rc =
                      tcp_addr->resolve (address.c_str (), true, options.ipv6);
1057 1058 1059 1060 1061
                    if (rc == 0) {
                        tcp_addr->to_string (resolved_addr);
                    }
                }
            }
1062
            LIBZMQ_DELETE (tcp_addr);
1063 1064 1065
        }
    }

1066
    //  Find the endpoints range (if any) corresponding to the addr_ string.
1067 1068
    const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
      endpoints.equal_range (resolved_addr);
1069 1070
    if (range.first == range.second) {
        errno = ENOENT;
1071
        return -1;
1072
    }
1073

1074 1075 1076
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1077
            it->second.second->terminate (false);
1078
        term_child (it->second.first);
1079
    }
1080
    endpoints.erase (range.first, range.second);
1081
    return 0;
1082 1083
}

1084
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1085
{
1086
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1087

1088
    //  Check whether the library haven't been shut down yet.
1089
    if (unlikely (ctx_terminated)) {
1090 1091 1092 1093
        errno = ETERM;
        return -1;
    }

1094
    //  Check whether message passed to the function is valid.
1095
    if (unlikely (!msg_ || !msg_->check ())) {
1096 1097 1098 1099
        errno = EFAULT;
        return -1;
    }

1100
    //  Process pending commands, if any.
1101
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1102
    if (unlikely (rc != 0)) {
1103
        return -1;
somdoron's avatar
somdoron committed
1104
    }
1105

1106 1107 1108
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1109
    //  At this point we impose the flags on the message.
1110
    if (flags_ & ZMQ_SNDMORE)
1111
        msg_->set_flags (msg_t::more);
1112

1113 1114
    msg_->reset_metadata ();

1115
    //  Try to send the message using method in each socket class
1116
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1117
    if (rc == 0) {
1118
        return 0;
somdoron's avatar
somdoron committed
1119
    }
1120
    if (unlikely (errno != EAGAIN)) {
1121
        return -1;
somdoron's avatar
somdoron committed
1122
    }
Martin Sustrik's avatar
Martin Sustrik committed
1123

1124
    //  In case of non-blocking send we'll simply propagate
1125
    //  the error - including EAGAIN - up the stack.
1126
    if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
Martin Sustrik's avatar
Martin Sustrik committed
1127
        return -1;
somdoron's avatar
somdoron committed
1128
    }
Martin Sustrik's avatar
Martin Sustrik committed
1129

1130
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1131
    //  If the timeout is infinite, don't care.
1132 1133 1134
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1135 1136
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1137 1138
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1139
        if (unlikely (process_commands (timeout, false) != 0)) {
1140
            return -1;
1141
        }
1142
        rc = xsend (msg_);
1143 1144
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1145
        if (unlikely (errno != EAGAIN)) {
1146
            return -1;
somdoron's avatar
somdoron committed
1147
        }
1148
        if (timeout > 0) {
1149
            timeout = static_cast<int> (end - clock.now_ms ());
1150 1151 1152 1153 1154
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1155
    }
somdoron's avatar
somdoron committed
1156

Martin Sustrik's avatar
Martin Sustrik committed
1157
    return 0;
1158 1159
}

1160
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1161
{
1162
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1163

1164
    //  Check whether the library haven't been shut down yet.
1165
    if (unlikely (ctx_terminated)) {
1166 1167 1168 1169
        errno = ETERM;
        return -1;
    }

1170
    //  Check whether message passed to the function is valid.
1171
    if (unlikely (!msg_ || !msg_->check ())) {
1172 1173 1174 1175
        errno = EFAULT;
        return -1;
    }

1176 1177 1178 1179 1180 1181 1182
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1183
    //  ticks is more efficient than doing RDTSC all the time.
1184
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1185
        if (unlikely (process_commands (0, false) != 0)) {
1186
            return -1;
somdoron's avatar
somdoron committed
1187
        }
1188 1189 1190
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1191
    //  Get the message.
1192
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1193
    if (unlikely (rc != 0 && errno != EAGAIN)) {
Martin Hurton's avatar
Martin Hurton committed
1194
        return -1;
somdoron's avatar
somdoron committed
1195
    }
Martin Hurton's avatar
Martin Hurton committed
1196

1197
    //  If we have the message, return immediately.
1198
    if (rc == 0) {
1199
        extract_flags (msg_);
1200
        return 0;
1201
    }
1202

Martin Sustrik's avatar
Martin Sustrik committed
1203
    //  If the message cannot be fetched immediately, there are two scenarios.
1204
    //  For non-blocking recv, commands are processed in case there's an
Patrik Wenger's avatar
Patrik Wenger committed
1205
    //  activate_reader command already waiting in a command pipe.
1206
    //  If it's not, return EAGAIN.
1207
    if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1208
        if (unlikely (process_commands (0, false) != 0)) {
1209
            return -1;
somdoron's avatar
somdoron committed
1210
        }
1211
        ticks = 0;
1212

1213
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1214
        if (rc < 0) {
1215
            return rc;
somdoron's avatar
somdoron committed
1216
        }
1217
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1218

1219
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1220 1221
    }

1222
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1223
    //  If the timeout is infinite, don't care.
1224 1225 1226
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1227 1228
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1229
    bool block = (ticks != 0);
1230
    while (true) {
somdoron's avatar
somdoron committed
1231
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1232
            return -1;
somdoron's avatar
somdoron committed
1233
        }
1234
        rc = xrecv (msg_);
1235 1236 1237 1238
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1239
        if (unlikely (errno != EAGAIN)) {
1240
            return -1;
somdoron's avatar
somdoron committed
1241
        }
1242
        block = true;
1243
        if (timeout > 0) {
1244
            timeout = static_cast<int> (end - clock.now_ms ());
1245 1246 1247 1248 1249
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1250
    }
1251

1252
    extract_flags (msg_);
1253
    return 0;
1254 1255 1256 1257
}

int zmq::socket_base_t::close ()
{
1258
    scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
1259

1260 1261
    //  Remove all existing signalers for thread safe sockets
    if (thread_safe)
1262
        (static_cast<mailbox_safe_t *> (mailbox))->clear_signalers ();
1263

1264 1265
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1266

1267

1268 1269 1270 1271
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1272

1273 1274 1275
    return 0;
}

1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1286
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1287
{
1288
    //  Plug the socket to the reaper thread.
1289
    poller = poller_;
somdoron's avatar
somdoron committed
1290 1291 1292 1293

    fd_t fd;

    if (!thread_safe)
1294
        fd = (static_cast<mailbox_t *> (mailbox))->get_fd ();
1295
    else {
1296
        scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1297

1298
        reaper_signaler = new (std::nothrow) signaler_t ();
1299
        zmq_assert (reaper_signaler);
1300

somdoron's avatar
somdoron committed
1301
        //  Add signaler to the safe mailbox
1302
        fd = reaper_signaler->get_fd ();
1303 1304
        (static_cast<mailbox_safe_t *> (mailbox))
          ->add_signaler (reaper_signaler);
somdoron's avatar
somdoron committed
1305 1306

        //  Send a signal to make sure reaper handle existing commands
1307
        reaper_signaler->send ();
somdoron's avatar
somdoron committed
1308 1309 1310
    }

    handle = poller->add_fd (fd, this);
1311
    poller->set_pollin (handle);
1312 1313 1314 1315 1316

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1317 1318
}

1319
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1320
{
1321
    int rc;
1322
    command_t cmd;
1323 1324
    if (timeout_ != 0) {
        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1325
        rc = mailbox->recv (&cmd, timeout_);
1326
    } else {
1327 1328 1329
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1330
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1331
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1332

1333 1334 1335 1336 1337 1338
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1339
        if (tsc && throttle_) {
Martin Sustrik's avatar
Martin Sustrik committed
1340 1341 1342
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1343
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1344
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1345
            last_tsc = tsc;
1346 1347 1348
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1349
        rc = mailbox->recv (&cmd, 0);
1350
    }
Martin Sustrik's avatar
Martin Sustrik committed
1351

1352 1353
    //  Process all available commands.
    while (rc == 0) {
1354
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1355
        rc = mailbox->recv (&cmd, 0);
1356 1357 1358 1359 1360 1361
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1362

1363
    if (ctx_terminated) {
1364 1365
        errno = ETERM;
        return -1;
1366
    }
1367 1368

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1369 1370
}

1371
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1372
{
1373
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1374
    //  We'll remember the fact so that any blocking call is interrupted and any
1375 1376
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1377
    scoped_lock_t lock (monitor_sync);
1378
    stop_monitor ();
1379

1380
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1381 1382
}

1383
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1384
{
1385
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1386 1387
}

1388
void zmq::socket_base_t::process_term (int linger_)
1389
{
1390 1391 1392 1393 1394
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1395 1396
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1397
        pipes[i]->terminate (false);
1398
    register_term_acks (static_cast<int> (pipes.size ()));
1399

1400
    //  Continue the termination process immediately.
1401
    own_t::process_term (linger_);
1402 1403
}

1404 1405
void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
{
1406
    term_endpoint (endpoint_->c_str ());
1407 1408 1409
    delete endpoint_;
}

1410
void zmq::socket_base_t::update_pipe_options (int option_)
1411
{
1412 1413 1414 1415
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
        for (pipes_t::size_type i = 0; i != pipes.size (); ++i) {
            pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
            pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1416 1417
        }
    }
1418 1419
}

1420 1421 1422 1423 1424
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1425
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1426 1427 1428 1429
{
    errno = EINVAL;
    return -1;
}
1430 1431 1432 1433 1434 1435

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1436
int zmq::socket_base_t::xsend (msg_t *)
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

somdoron's avatar
somdoron committed
1447 1448
int zmq::socket_base_t::xjoin (const char *group_)
{
1449
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1450 1451 1452 1453 1454 1455
    errno = ENOTSUP;
    return -1;
}

int zmq::socket_base_t::xleave (const char *group_)
{
1456
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1457 1458 1459 1460
    errno = ENOTSUP;
    return -1;
}

1461
int zmq::socket_base_t::xrecv (msg_t *)
1462 1463 1464 1465 1466
{
    errno = ENOTSUP;
    return -1;
}

1467 1468 1469
static const zmq::blob_t empty_blob;

const zmq::blob_t &zmq::socket_base_t::get_credential () const
1470
{
1471
    return empty_blob;
1472 1473
}

1474
void zmq::socket_base_t::xread_activated (pipe_t *)
1475 1476 1477
{
    zmq_assert (false);
}
1478
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1479 1480 1481 1482
{
    zmq_assert (false);
}

1483
void zmq::socket_base_t::xhiccuped (pipe_t *)
1484
{
1485
    zmq_assert (false);
1486 1487
}

1488 1489
void zmq::socket_base_t::in_event ()
{
1490 1491 1492 1493
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1494 1495
    {
        scoped_optional_lock_t sync_lock (thread_safe ? &sync : NULL);
1496

1497 1498 1499
        //  If the socket is thread safe we need to unsignal the reaper signaler
        if (thread_safe)
            reaper_signaler->recv ();
1500

1501 1502 1503
        process_commands (0, false);
    }
    check_destroy ();
1504 1505 1506 1507 1508 1509 1510
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1511
void zmq::socket_base_t::timer_event (int)
1512 1513 1514
{
    zmq_assert (false);
}
1515

1516 1517
void zmq::socket_base_t::check_destroy ()
{
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {
        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1544 1545
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1546
    if (options.immediate == 1)
1547 1548 1549 1550
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1551 1552
}

1553
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1554 1555
{
    //  Notify the specific socket type about the pipe termination.
1556
    xpipe_terminated (pipe_);
1557

1558
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1559
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1560
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1561
            inprocs.erase (it);
1562
            break;
1563 1564
        }

1565 1566 1567 1568 1569 1570 1571
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1572 1573
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1574 1575 1576
    //  Test whether routing_id flag is valid for this socket type.
    if (unlikely (msg_->flags () & msg_t::routing_id))
        zmq_assert (options.recv_routing_id);
Martin Hurton's avatar
Martin Hurton committed
1577

1578
    //  Remove MORE flag.
1579 1580
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1581

1582
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1583
{
1584 1585
    scoped_lock_t lock (monitor_sync);

1586 1587 1588 1589
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1590

1591
    //  Support deregistering monitoring endpoints as well
1592 1593 1594 1595 1596 1597 1598
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1599
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1600 1601
        return -1;

1602
    //  Event notification only supported over inproc://
1603 1604 1605 1606
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1607 1608 1609 1610
    // already monitoring. Stop previous monitor before starting new one.
    if (monitor_socket != NULL) {
        stop_monitor (true);
    }
1611
    //  Register events to monitor
1612
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1613
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1614 1615 1616
    if (monitor_socket == NULL)
        return -1;

1617
    //  Never block context termination on pending event messages
1618
    int linger = 0;
1619 1620
    int rc =
      zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1621
    if (rc == -1)
1622
        stop_monitor (false);
1623

1624
    //  Spawn the monitor socket endpoint
1625 1626
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
1627
        stop_monitor (false);
1628 1629 1630
    return rc;
}

1631 1632
void zmq::socket_base_t::event_connected (const std::string &addr_,
                                          zmq::fd_t fd_)
1633
{
1634
    event (addr_, fd_, ZMQ_EVENT_CONNECTED);
1635
}
1636

1637 1638
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_,
                                                int err_)
1639
{
1640
    event (addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
1641
}
1642

1643 1644
void zmq::socket_base_t::event_connect_retried (const std::string &addr_,
                                                int interval_)
1645
{
1646
    event (addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
1647 1648
}

1649 1650
void zmq::socket_base_t::event_listening (const std::string &addr_,
                                          zmq::fd_t fd_)
1651
{
1652
    event (addr_, fd_, ZMQ_EVENT_LISTENING);
1653 1654
}

Martin Hurton's avatar
Martin Hurton committed
1655
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1656
{
1657
    event (addr_, err_, ZMQ_EVENT_BIND_FAILED);
1658 1659
}

1660 1661
void zmq::socket_base_t::event_accepted (const std::string &addr_,
                                         zmq::fd_t fd_)
1662
{
1663
    event (addr_, fd_, ZMQ_EVENT_ACCEPTED);
1664 1665
}

1666 1667
void zmq::socket_base_t::event_accept_failed (const std::string &addr_,
                                              int err_)
1668
{
1669
    event (addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
1670 1671
}

1672
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_)
1673
{
1674
    event (addr_, fd_, ZMQ_EVENT_CLOSED);
1675
}
Martin Hurton's avatar
Martin Hurton committed
1676 1677

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1678
{
1679
    event (addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
1680 1681
}

1682 1683
void zmq::socket_base_t::event_disconnected (const std::string &addr_,
                                             zmq::fd_t fd_)
1684
{
1685
    event (addr_, fd_, ZMQ_EVENT_DISCONNECTED);
1686 1687
}

1688 1689
void zmq::socket_base_t::event_handshake_failed_no_detail (
  const std::string &addr_, int err_)
Vincent Tellier's avatar
Vincent Tellier committed
1690
{
1691
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
Vincent Tellier's avatar
Vincent Tellier committed
1692 1693
}

1694 1695
void zmq::socket_base_t::event_handshake_failed_protocol (
  const std::string &addr_, int err_)
1696
{
1697
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1698 1699
}

1700 1701
void zmq::socket_base_t::event_handshake_failed_auth (const std::string &addr_,
                                                      int err_)
1702
{
1703
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1704 1705 1706 1707 1708 1709
}

void zmq::socket_base_t::event_handshake_succeeded (const std::string &addr_,
                                                    int err_)
{
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
Vincent Tellier's avatar
Vincent Tellier committed
1710 1711
}

1712 1713 1714
void zmq::socket_base_t::event (const std::string &addr_,
                                intptr_t value_,
                                int type_)
1715
{
1716 1717
    scoped_lock_t lock (monitor_sync);
    if (monitor_events & type_) {
1718
        monitor_event (type_, value_, addr_);
1719
    }
1720 1721
}

1722
//  Send a monitor event
1723 1724 1725
void zmq::socket_base_t::monitor_event (int event_,
                                        intptr_t value_,
                                        const std::string &addr_)
1726
{
1727 1728 1729
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1730
    if (monitor_socket) {
1731
        //  Send event in first frame
1732
        zmq_msg_t msg;
1733
        zmq_msg_init_size (&msg, 6);
1734
        uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1735
        //  Avoid dereferencing uint32_t on unaligned address
1736 1737
        uint16_t event = static_cast<uint16_t> (event_);
        uint32_t value = static_cast<uint32_t> (value_);
1738 1739
        memcpy (data + 0, &event, sizeof (event));
        memcpy (data + 2, &value, sizeof (value));
1740
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1741 1742

        //  Send address in second frame
1743
        zmq_msg_init_size (&msg, addr_.size ());
1744
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1745 1746
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1747 1748
}

1749
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1750
{
1751 1752 1753
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1754
    if (monitor_socket) {
1755 1756
        if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
            && send_monitor_stopped_event_)
1757
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1758 1759 1760 1761
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1762
}