socket_base.cpp 48.2 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <string>
32 33
#include <algorithm>

34
#include "macros.hpp"
35 36 37 38 39
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
40
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
41 42
#include <cmnintrin.h>
#else
43 44
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
45
#endif
46 47
#else
#include <unistd.h>
48
#include <ctype.h>
49
#endif
50

51
#include "socket_base.hpp"
52
#include "tcp_listener.hpp"
53
#include "ipc_listener.hpp"
54
#include "tipc_listener.hpp"
55
#include "tcp_connecter.hpp"
56
#include "io_thread.hpp"
57
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
59
#include "pipe.hpp"
60
#include "err.hpp"
61
#include "ctx.hpp"
malosek's avatar
malosek committed
62
#include "platform.hpp"
63
#include "likely.hpp"
64
#include "msg.hpp"
65 66 67
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
68
#include "udp_address.hpp"
69
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
70 71
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
72 73 74 75 76 77

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

78 79 80
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
81

82 83 84 85 86 87 88
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
89 90
#include "dealer.hpp"
#include "router.hpp"
91 92
#include "xpub.hpp"
#include "xsub.hpp"
93
#include "stream.hpp"
94
#include "server.hpp"
95
#include "client.hpp"
somdoron's avatar
somdoron committed
96 97
#include "radio.hpp"
#include "dish.hpp"
98

somdoron's avatar
somdoron committed
99 100 101 102
#define ENTER_MUTEX() \
    if (thread_safe) \
        sync.lock();

103
#define EXIT_MUTEX(); \
somdoron's avatar
somdoron committed
104 105 106
    if (thread_safe) \
        sync.unlock();

107 108 109 110 111
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

112
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
113
    uint32_t tid_, int sid_)
114 115 116
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
153 154 155
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
156 157 158
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
159 160 161 162 163 164
        case ZMQ_RADIO:
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
165 166 167
        default:
            errno = EINVAL;
            return NULL;
168
    }
169 170

    alloc_assert (s);
somdoron's avatar
somdoron committed
171 172 173

    mailbox_t *mailbox = dynamic_cast<mailbox_t*> (s->mailbox);

174 175
    if (mailbox != NULL && mailbox->get_fd () == retired_fd) {
        s->destroyed = true;
176
        LIBZMQ_DELETE(s);
Pieter Hintjens's avatar
Pieter Hintjens committed
177
        return NULL;
178
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
179

180 181 182
    return s;
}

somdoron's avatar
somdoron committed
183
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
184
    own_t (parent_, tid_),
185
    tag (0xbaddecaf),
186
    ctx_terminated (false),
187
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
188
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
189
    ticks (0),
190
    rcvmore (false),
191
    file_desc(-1),
192
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
193
    monitor_events (0),
194 195
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
196
{
197
    options.socket_id = sid_;
198
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
199
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
200 201 202 203 204

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else
        mailbox = new mailbox_t();
205 206 207 208
}

zmq::socket_base_t::~socket_base_t ()
{
209
    LIBZMQ_DELETE(mailbox);
Ilya Kulakov's avatar
Ilya Kulakov committed
210

211 212 213
    if (reaper_signaler) {
        LIBZMQ_DELETE(reaper_signaler);
    }
Ilya Kulakov's avatar
Ilya Kulakov committed
214

215
    stop_monitor ();
216
    zmq_assert (destroyed);
217 218
}

somdoron's avatar
somdoron committed
219
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
220
{
somdoron's avatar
somdoron committed
221
    return mailbox;
222 223 224 225
}

void zmq::socket_base_t::stop ()
{
226 227
    //  Called by ctx when it is terminated (zmq_ctx_term).
    //  'stop' command is sent from the threads that called zmq_ctx_term to
228 229 230 231 232
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

233 234 235 236 237 238 239 240 241 242 243 244 245
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
246

247 248 249 250 251 252 253
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

254 255
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
256
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
257 258 259 260 261 262
    if (protocol_ != "inproc"
    &&  protocol_ != "ipc"
    &&  protocol_ != "tcp"
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
    &&  protocol_ != "tipc"
Ilya Kulakov's avatar
Ilya Kulakov committed
263
    &&  protocol_ != "norm"
264 265
    &&  protocol_ != "vmci"
    &&  protocol_ != "udp") {
266 267 268 269
        errno = EPROTONOSUPPORT;
        return -1;
    }
    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
270
    //  are not available.
271 272 273 274 275 276
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
277

bebopagogo's avatar
bebopagogo committed
278 279 280 281 282 283
#if !defined ZMQ_HAVE_NORM
    if (protocol_ == "norm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif // !ZMQ_HAVE_NORM
284 285 286

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
287
    if (protocol_ == "ipc") {
288 289 290 291 292 293
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

294
    // TIPC transport is only available on Linux.
295
#if !defined ZMQ_HAVE_TIPC
Erik Hugne's avatar
Erik Hugne committed
296
    if (protocol_ == "tipc") {
297 298 299 300 301
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

Ilya Kulakov's avatar
Ilya Kulakov committed
302 303 304 305 306 307 308
#if !defined ZMQ_HAVE_VMCI
    if (protocol_ == "vmci") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

309 310 311
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
bebopagogo's avatar
bebopagogo committed
312
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
313 314
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
315 316 317 318
        errno = ENOCOMPATPROTO;
        return -1;
    }

319 320 321
    if (protocol_ == "udp" && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO))
        return -1;

322 323 324 325
    //  Protocol is available.
    return 0;
}

326
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
327
{
328 329 330
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
331

332
    //  Let the derived socket type know about new pipe.
333
    xattach_pipe (pipe_, subscribe_to_all_);
334 335 336 337 338

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
339
        pipe_->terminate (false);
340
    }
341 342
}

343
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
344
    size_t optvallen_)
345
{
346
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
347

348 349
    if (!options.is_valid(option_)) {
        errno = EINVAL;
350
        EXIT_MUTEX ();
351 352
        return -1;
    }
353

354
    if (unlikely (ctx_terminated)) {
355
        errno = ETERM;
356
        EXIT_MUTEX ();
357 358 359
        return -1;
    }

360 361
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
362
    if (rc == 0 || errno != EINVAL) {
363
        EXIT_MUTEX ();
364
        return rc;
somdoron's avatar
somdoron committed
365
    }
366 367 368

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
369
    rc = options.setsockopt (option_, optval_, optvallen_);
370
    update_pipe_options(option_);
somdoron's avatar
somdoron committed
371

372
    EXIT_MUTEX ();
somdoron's avatar
somdoron committed
373
    return rc;
374 375
}

376 377 378
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
379
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
380

381
    if (unlikely (ctx_terminated)) {
382
        errno = ETERM;
383
        EXIT_MUTEX ();
384 385 386
        return -1;
    }

387
    if (option_ == ZMQ_RCVMORE) {
388
        if (*optvallen_ < sizeof (int)) {
389
            errno = EINVAL;
390
            EXIT_MUTEX ();
391 392
            return -1;
        }
393
        memset(optval_, 0, *optvallen_);
394 395
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
396
        EXIT_MUTEX ();
397 398 399
        return 0;
    }

400 401 402
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
403
            EXIT_MUTEX ();
404 405
            return -1;
        }
somdoron's avatar
somdoron committed
406 407 408 409

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
410
            EXIT_MUTEX ();
somdoron's avatar
somdoron committed
411 412
            return -1;
        }
413

somdoron's avatar
somdoron committed
414
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
415 416
        *optvallen_ = sizeof(fd_t);

417
        EXIT_MUTEX ();
418 419 420 421
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
422
        if (*optvallen_ < sizeof (int)) {
423
            errno = EINVAL;
424
            EXIT_MUTEX ();
425 426
            return -1;
        }
427
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
428
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
429
            EXIT_MUTEX ();
430
            return -1;
somdoron's avatar
somdoron committed
431
        }
432
        errno_assert (rc == 0);
433
        *((int*) optval_) = 0;
434
        if (has_out ())
435
            *((int*) optval_) |= ZMQ_POLLOUT;
436
        if (has_in ())
437 438
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
439
        EXIT_MUTEX ();
440 441 442
        return 0;
    }

443 444 445
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
446
            EXIT_MUTEX ();
447 448 449 450
            return -1;
        }
        strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
        *optvallen_ = last_endpoint.size () + 1;
451
        EXIT_MUTEX ();
452 453 454
        return 0;
    }

455 456 457
    if (option_ == ZMQ_THREAD_SAFE) {
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
458
            EXIT_MUTEX ();
459 460 461 462 463
            return -1;
        }
        memset(optval_, 0, *optvallen_);
        *((int*) optval_) = thread_safe	? 1 : 0;
        *optvallen_ = sizeof (int);
464
        EXIT_MUTEX ();
465
        return 0;
Ilya Kulakov's avatar
Ilya Kulakov committed
466
    }
467

somdoron's avatar
somdoron committed
468
    int rc = options.getsockopt (option_, optval_, optvallen_);
469
    EXIT_MUTEX ();
somdoron's avatar
somdoron committed
470
    return rc;
471 472
}

somdoron's avatar
somdoron committed
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
int zmq::socket_base_t::join (const char* group_)
{
    ENTER_MUTEX ();

    int rc = xjoin (group_);

    EXIT_MUTEX();

    return rc;
}

int zmq::socket_base_t::leave (const char* group_)
{
    ENTER_MUTEX ();

    int rc = xleave (group_);

    EXIT_MUTEX();

    return rc;
}

495 496
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
497
    ENTER_MUTEX ();
498 499 500

    if (!thread_safe) {
        errno = EINVAL;
501
        EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
502
        return -1;
503 504 505 506
    }

    ((mailbox_safe_t*)mailbox)->add_signaler(s_);

507
    EXIT_MUTEX ();
508 509 510 511 512
    return 0;
}

int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
513
    ENTER_MUTEX ();
514 515 516

    if (!thread_safe) {
        errno = EINVAL;
517
        EXIT_MUTEX ();
518 519 520 521 522
        return -1;
    }

    ((mailbox_safe_t*)mailbox)->remove_signaler(s_);

523
    EXIT_MUTEX ();
524 525 526
    return 0;
}

527 528
int zmq::socket_base_t::bind (const char *addr_)
{
529
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
530

531
    if (unlikely (ctx_terminated)) {
532
        errno = ETERM;
533
        EXIT_MUTEX ();
534 535 536
        return -1;
    }

537 538
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
539
    if (unlikely (rc != 0)) {
540
        EXIT_MUTEX ();
541
        return -1;
somdoron's avatar
somdoron committed
542
    }
543

544
    //  Parse addr_ string.
545 546
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
547
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
548
        EXIT_MUTEX ();
549
        return -1;
somdoron's avatar
somdoron committed
550
    }
551

Pieter Hintjens's avatar
Pieter Hintjens committed
552
    if (protocol == "inproc") {
553
        const endpoint_t endpoint = { this, options };
Martin Hurton's avatar
Martin Hurton committed
554
        const int rc = register_endpoint (addr_, endpoint);
555
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
556
            connect_pending (addr_, this);
557
            last_endpoint.assign (addr_);
558
            options.connected = true;
559
        }
560
        EXIT_MUTEX ();
561
        return rc;
562
    }
563

564
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
565
        //  For convenience's sake, bind can be used interchangeable with
566
        //  connect for PGM, EPGM, NORM and UDP transports.
567
        EXIT_MUTEX ();
568 569 570 571
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
572 573
    }

574
    //  Remaining transports require to be run in an I/O thread, so at this
575 576 577 578
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
579
        EXIT_MUTEX ();
580 581
        return -1;
    }
582

583
    if (protocol == "tcp") {
584
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
585
            io_thread, this, options);
586
        alloc_assert (listener);
587
        int rc = listener->set_address (address.c_str ());
588
        if (rc != 0) {
589
            LIBZMQ_DELETE(listener);
590
            event_bind_failed (address, zmq_errno());
591
            EXIT_MUTEX ();
592
            return -1;
593
        }
594

595
        // Save last endpoint URI
596
        listener->get_address (last_endpoint);
597

598
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
599
        options.connected = true;
600
        EXIT_MUTEX ();
601 602 603
        return 0;
    }

604
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
605 606 607 608
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
609
        int rc = listener->set_address (address.c_str ());
610
        if (rc != 0) {
611
            LIBZMQ_DELETE(listener);
612
            event_bind_failed (address, zmq_errno());
613
            EXIT_MUTEX ();
614 615
            return -1;
        }
616

617
        // Save last endpoint URI
618
        listener->get_address (last_endpoint);
619

620
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
621
        options.connected = true;
622
        EXIT_MUTEX ();
623
        return 0;
624
    }
625
#endif
626
#if defined ZMQ_HAVE_TIPC
627 628 629 630 631 632
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
633
             LIBZMQ_DELETE(listener);
634
             event_bind_failed (address, zmq_errno());
635
             EXIT_MUTEX ();
636 637 638 639 640 641 642
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
643
        options.connected = true;
644
        EXIT_MUTEX ();
645 646 647
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
648 649 650 651 652 653 654 655 656
#if defined ZMQ_HAVE_VMCI
    if (protocol == "vmci") {
        vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(listener);
            event_bind_failed (address, zmq_errno ());
657
            EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
658 659 660 661 662 663 664
            return -1;
        }

        listener->get_address (last_endpoint);

        add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
        options.connected = true;
665
        EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
666 667 668
        return 0;
    }
#endif
669

670
    EXIT_MUTEX ();
671
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
672
    return -1;
673 674
}

675
int zmq::socket_base_t::connect (const char *addr_)
676
{
677
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
678

679
    if (unlikely (ctx_terminated)) {
680
        errno = ETERM;
681
        EXIT_MUTEX ();
682 683 684
        return -1;
    }

685 686
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
687
    if (unlikely (rc != 0)) {
688
        EXIT_MUTEX ();
689
        return -1;
somdoron's avatar
somdoron committed
690
    }
691

malosek's avatar
malosek committed
692
    //  Parse addr_ string.
693 694
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
695
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
696
        EXIT_MUTEX ();
697
        return -1;
somdoron's avatar
somdoron committed
698
    }
malosek's avatar
malosek committed
699

Pieter Hintjens's avatar
Pieter Hintjens committed
700
    if (protocol == "inproc") {
701

702 703 704 705
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

706 707
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
708

709
        // The total HWM for an inproc connection should be the sum of
710
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
711
        int sndhwm = 0;
712 713 714
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
715
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
716
        int rcvhwm = 0;
717 718
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
719 720
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
721
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
722

723
        //  Create a bi-directional pipe to connect the peers.
724
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
725
        pipe_t *new_pipes [2] = {NULL, NULL};
726 727 728 729 730 731 732 733 734 735

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
736
        int rc = pipepair (parents, new_pipes, hwms, conflates);
737 738 739 740 741
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

742
        errno_assert (rc == 0);
743

744 745 746 747 748 749 750 751 752 753 754 755 756 757
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
758 759
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
760
        }
Martin Hurton's avatar
Martin Hurton committed
761
        else {
762 763 764 765 766 767 768 769 770 771 772
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
773

774 775 776 777 778 779 780 781 782 783 784
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
785

786 787 788 789 790
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
791

Martin Hurton's avatar
Martin Hurton committed
792 793 794
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

795
        // Save last endpoint URI
796
        last_endpoint.assign (addr_);
797

798
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
799
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
800

801
        options.connected = true;
802
        EXIT_MUTEX ();
803 804
        return 0;
    }
805 806 807 808
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
809
        const endpoints_t::iterator it = endpoints.find (addr_);
810 811 812 813
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
814
            EXIT_MUTEX ();
815 816 817
            return 0;
        }
    }
818

819 820 821 822
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
823
        EXIT_MUTEX ();
824 825 826
        return -1;
    }

Ilya Kulakov's avatar
Ilya Kulakov committed
827
    address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
828
    alloc_assert (paddr);
829 830 831

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
832 833 834
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
835 836
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
837 838 839 840 841 842
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
843
        if (isalnum (*check) || isxdigit (*check) || *check == '[') {
844 845 846
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
847
                || *check == '.' || *check == '-' || *check == ':' || *check == '%'
848 849
                || *check == ';' || *check == ']' || *check == '_'
            ) {
850
                check++;
851
            }
852 853 854 855 856 857 858 859 860 861 862 863 864 865
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
866
            errno = EINVAL;
867
            LIBZMQ_DELETE(paddr);
868
            EXIT_MUTEX ();
869 870 871
            return -1;
        }
        //  Defer resolution until a socket is opened
872
        paddr->resolved.tcp_addr = NULL;
873
    }
874
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
875 876
    else
    if (protocol == "ipc") {
877
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
878
        alloc_assert (paddr->resolved.ipc_addr);
879 880
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
881
            LIBZMQ_DELETE(paddr);
882
            EXIT_MUTEX ();
883 884 885
            return -1;
        }
    }
886
#endif
887

888 889 890 891 892 893 894 895 896 897 898
if (protocol  == "udp") {
    paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
    alloc_assert (paddr->resolved.udp_addr);
    int rc = paddr->resolved.udp_addr->resolve (address.c_str());
    if (rc != 0) {
        LIBZMQ_DELETE(paddr);
        EXIT_MUTEX ();
        return -1;
    }
}

bebopagogo's avatar
bebopagogo committed
899
// TBD - Should we check address for ZMQ_HAVE_NORM???
900

901 902 903 904 905 906 907
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
908
        if (rc != 0 || port_number == 0) {
909
          EXIT_MUTEX ();
910 911
          return -1;
        }
912
    }
913
#endif
914
#if defined ZMQ_HAVE_TIPC
915 916 917 918 919 920
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
921
            LIBZMQ_DELETE(paddr);
922
            EXIT_MUTEX ();
923 924 925 926
            return -1;
        }
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
927 928 929 930 931 932 933 934
#if defined ZMQ_HAVE_VMCI
    else
    if (protocol == "vmci") {
        paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
935
            EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
936 937 938 939
            return -1;
        }
    }
#endif
940

941
    //  Create session.
942
    session_base_t *session = session_base_t::create (io_thread, true, this,
943
        options, paddr);
944
    errno_assert (session);
945

946
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
947
    //  sent to this pipe. (same for NORM, currently?)
948
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
949
    pipe_t *newpipe = NULL;
950

951
    if (options.immediate != 1 || subscribe_to_all) {
952 953
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
954
        pipe_t *new_pipes [2] = {NULL, NULL};
955 956 957 958 959 960 961 962 963 964 965

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
966
        rc = pipepair (parents, new_pipes, hwms, conflates);
967
        errno_assert (rc == 0);
968

969
        //  Attach local end of the pipe to the socket object.
970
        attach_pipe (new_pipes [0], subscribe_to_all);
971
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
972

973
        //  Attach remote end of the pipe to the session object later on.
974
        session->attach_pipe (new_pipes [1]);
975 976 977
    }

    //  Save last endpoint URI
978
    paddr->to_string (last_endpoint);
979

980
    add_endpoint (addr_, (own_t *) session, newpipe);
981
    EXIT_MUTEX ();
982 983 984
    return 0;
}

985
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
986
{
987
    //  Activate the session. Make it a child of this socket.
988
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
989
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
990 991 992 993
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
994
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
995

996 997 998
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
999
        EXIT_MUTEX ();
1000 1001
        return -1;
    }
malosek's avatar
malosek committed
1002

1003
    //  Check whether endpoint address passed to the function is valid.
1004 1005
    if (unlikely (!addr_)) {
        errno = EINVAL;
1006
        EXIT_MUTEX ();
1007 1008 1009
        return -1;
    }

1010 1011 1012
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
1013
    if (unlikely(rc != 0)) {
1014
        EXIT_MUTEX ();
1015
        return -1;
somdoron's avatar
somdoron committed
1016
    }
1017

1018 1019 1020
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
1021
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
1022
        EXIT_MUTEX ();
1023
        return -1;
somdoron's avatar
somdoron committed
1024
    }
1025 1026 1027

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
1028
        if (unregister_endpoint (std::string(addr_), this) == 0) {
1029
            EXIT_MUTEX ();
Martin Hurton's avatar
Martin Hurton committed
1030
            return 0;
somdoron's avatar
somdoron committed
1031
        }
1032 1033 1034
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
1035
            EXIT_MUTEX ();
1036 1037
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
1038

1039
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
1040
            it->second->terminate (true);
1041
        inprocs.erase (range.first, range.second);
1042
        EXIT_MUTEX ();
1043 1044 1045
        return 0;
    }

1046 1047
    //  Find the endpoints range (if any) corresponding to the addr_ string.
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
1048 1049
    if (range.first == range.second) {
        errno = ENOENT;
1050
        EXIT_MUTEX ();
1051
        return -1;
1052
    }
1053

1054 1055 1056
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1057
            it->second.second->terminate (false);
1058
        term_child (it->second.first);
1059
    }
1060
    endpoints.erase (range.first, range.second);
1061
    EXIT_MUTEX ();
1062
    return 0;
1063 1064
}

1065
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1066
{
1067
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1068

1069
    //  Check whether the library haven't been shut down yet.
1070
    if (unlikely (ctx_terminated)) {
1071
        errno = ETERM;
1072
        EXIT_MUTEX ();
1073 1074 1075
        return -1;
    }

1076
    //  Check whether message passed to the function is valid.
1077
    if (unlikely (!msg_ || !msg_->check ())) {
1078
        errno = EFAULT;
1079
        EXIT_MUTEX ();
1080 1081 1082
        return -1;
    }

1083
    //  Process pending commands, if any.
1084
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1085
    if (unlikely (rc != 0)) {
1086
        EXIT_MUTEX ();
1087
        return -1;
somdoron's avatar
somdoron committed
1088
    }
1089

1090 1091 1092
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1093
    //  At this point we impose the flags on the message.
1094
    if (flags_ & ZMQ_SNDMORE)
1095
        msg_->set_flags (msg_t::more);
1096

1097 1098
    msg_->reset_metadata ();

1099
    //  Try to send the message using method in each socket class
1100
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1101
    if (rc == 0) {
1102
        EXIT_MUTEX ();
1103
        return 0;
somdoron's avatar
somdoron committed
1104 1105
    }
    if (unlikely (errno != EAGAIN)) {
1106
        EXIT_MUTEX ();
1107
        return -1;
somdoron's avatar
somdoron committed
1108
    }
Martin Sustrik's avatar
Martin Sustrik committed
1109

1110
    //  In case of non-blocking send we'll simply propagate
1111
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
1112
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
1113
        EXIT_MUTEX ();
Martin Sustrik's avatar
Martin Sustrik committed
1114
        return -1;
somdoron's avatar
somdoron committed
1115
    }
Martin Sustrik's avatar
Martin Sustrik committed
1116

1117
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1118
    //  If the timeout is infinite, don't care.
1119 1120 1121
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1122 1123
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1124 1125
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1126
        if (unlikely (process_commands (timeout, false) != 0)) {
1127
            EXIT_MUTEX ();
1128
            return -1;
1129
        }
1130
        rc = xsend (msg_);
1131 1132
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1133
        if (unlikely (errno != EAGAIN)) {
1134
            EXIT_MUTEX ();
1135
            return -1;
somdoron's avatar
somdoron committed
1136
        }
1137
        if (timeout > 0) {
1138
            timeout = (int) (end - clock.now_ms ());
1139 1140
            if (timeout <= 0) {
                errno = EAGAIN;
1141
                EXIT_MUTEX ();
1142 1143 1144
                return -1;
            }
        }
1145
    }
somdoron's avatar
somdoron committed
1146

1147
    EXIT_MUTEX ();
Martin Sustrik's avatar
Martin Sustrik committed
1148
    return 0;
1149 1150
}

1151
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1152
{
1153
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1154

1155
    //  Check whether the library haven't been shut down yet.
1156
    if (unlikely (ctx_terminated)) {
1157
        errno = ETERM;
1158
        EXIT_MUTEX ();
1159 1160 1161
        return -1;
    }

1162
    //  Check whether message passed to the function is valid.
1163
    if (unlikely (!msg_ || !msg_->check ())) {
1164
        errno = EFAULT;
1165
        EXIT_MUTEX ();
1166 1167 1168
        return -1;
    }

1169 1170 1171 1172 1173 1174 1175
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1176
    //  ticks is more efficient than doing RDTSC all the time.
1177
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1178
        if (unlikely (process_commands (0, false) != 0)) {
1179
            EXIT_MUTEX ();
1180
            return -1;
somdoron's avatar
somdoron committed
1181
        }
1182 1183 1184
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1185
    //  Get the message.
1186
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1187
    if (unlikely (rc != 0 && errno != EAGAIN)) {
1188
        EXIT_MUTEX ();
Martin Hurton's avatar
Martin Hurton committed
1189
        return -1;
somdoron's avatar
somdoron committed
1190
    }
Martin Hurton's avatar
Martin Hurton committed
1191

1192
    //  If we have the message, return immediately.
1193
    if (rc == 0) {
1194
        if (file_desc != retired_fd)
1195
            msg_->set_fd(file_desc);
1196
        extract_flags (msg_);
1197
        EXIT_MUTEX ();
1198
        return 0;
1199
    }
1200

Martin Sustrik's avatar
Martin Sustrik committed
1201
    //  If the message cannot be fetched immediately, there are two scenarios.
1202 1203 1204
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
1205
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1206
        if (unlikely (process_commands (0, false) != 0)) {
1207
            EXIT_MUTEX ();
1208
            return -1;
somdoron's avatar
somdoron committed
1209
        }
1210
        ticks = 0;
1211

1212
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1213
        if (rc < 0) {
1214
            EXIT_MUTEX ();
1215
            return rc;
somdoron's avatar
somdoron committed
1216
        }
1217
        if (file_desc != retired_fd)
1218
            msg_->set_fd(file_desc);
1219
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1220

1221
        EXIT_MUTEX ();
1222
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1223 1224
    }

1225
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1226
    //  If the timeout is infinite, don't care.
1227 1228 1229
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1230 1231
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1232
    bool block = (ticks != 0);
1233
    while (true) {
somdoron's avatar
somdoron committed
1234
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1235
            EXIT_MUTEX ();
1236
            return -1;
somdoron's avatar
somdoron committed
1237
        }
1238
        rc = xrecv (msg_);
1239 1240 1241 1242
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1243
        if (unlikely (errno != EAGAIN)) {
1244
            EXIT_MUTEX ();
1245
            return -1;
somdoron's avatar
somdoron committed
1246
        }
1247
        block = true;
1248
        if (timeout > 0) {
1249
            timeout = (int) (end - clock.now_ms ());
1250 1251
            if (timeout <= 0) {
                errno = EAGAIN;
1252
                EXIT_MUTEX ();
1253 1254 1255
                return -1;
            }
        }
1256
    }
1257

1258
    if (file_desc != retired_fd)
1259
        msg_->set_fd(file_desc);
1260
    extract_flags (msg_);
1261
    EXIT_MUTEX ();
1262
    return 0;
1263 1264 1265 1266
}

int zmq::socket_base_t::close ()
{
1267 1268
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1269

1270 1271 1272 1273
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1274

1275 1276 1277
    return 0;
}

1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1288
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1289
{
1290
    //  Plug the socket to the reaper thread.
1291
    poller = poller_;
somdoron's avatar
somdoron committed
1292 1293 1294 1295 1296

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
1297
    else {
1298
        ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1299

1300 1301
        reaper_signaler =  new signaler_t();

somdoron's avatar
somdoron committed
1302
        //  Add signaler to the safe mailbox
1303
        fd = reaper_signaler->get_fd();
1304
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1305 1306

        //  Send a signal to make sure reaper handle existing commands
1307
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1308

1309
        EXIT_MUTEX ();
somdoron's avatar
somdoron committed
1310 1311 1312
    }

    handle = poller->add_fd (fd, this);
1313
    poller->set_pollin (handle);
1314 1315 1316 1317 1318

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1319 1320
}

1321
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1322
{
1323
    int rc;
1324
    command_t cmd;
1325 1326 1327
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1328
        rc = mailbox->recv (&cmd, timeout_);
1329 1330
    }
    else {
1331

1332 1333 1334
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1335
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1336
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1337

1338 1339 1340 1341 1342 1343
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1344 1345
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1346 1347 1348
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1349
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1350
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1351
            last_tsc = tsc;
1352 1353 1354
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1355
        rc = mailbox->recv (&cmd, 0);
1356
    }
Martin Sustrik's avatar
Martin Sustrik committed
1357

1358 1359
    //  Process all available commands.
    while (rc == 0) {
1360
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1361
        rc = mailbox->recv (&cmd, 0);
1362 1363 1364 1365 1366 1367
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1368

1369
    if (ctx_terminated) {
1370 1371
        errno = ETERM;
        return -1;
1372
    }
1373 1374

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1375 1376
}

1377
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1378
{
1379
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1380
    //  We'll remember the fact so that any blocking call is interrupted and any
1381 1382
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1383
    stop_monitor ();
1384
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1385 1386
}

1387
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1388
{
1389
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1390 1391
}

1392
void zmq::socket_base_t::process_term (int linger_)
1393
{
1394 1395 1396 1397 1398
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1399 1400
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1401
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1402
    register_term_acks ((int) pipes.size ());
1403

1404
    //  Continue the termination process immediately.
1405
    own_t::process_term (linger_);
1406 1407
}

1408 1409
void zmq::socket_base_t::update_pipe_options(int option_)
{
1410 1411 1412 1413 1414 1415 1416
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    {
        for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
        {
            pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
        }
    }
1417 1418 1419

}

1420 1421 1422 1423 1424
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1425
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1426 1427 1428 1429
{
    errno = EINVAL;
    return -1;
}
1430 1431 1432 1433 1434 1435

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1436
int zmq::socket_base_t::xsend (msg_t *)
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

somdoron's avatar
somdoron committed
1447 1448
int zmq::socket_base_t::xjoin (const char *group_)
{
1449
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1450 1451 1452 1453 1454 1455
    errno = ENOTSUP;
    return -1;
}

int zmq::socket_base_t::xleave (const char *group_)
{
1456
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1457 1458 1459 1460
    errno = ENOTSUP;
    return -1;
}

1461
int zmq::socket_base_t::xrecv (msg_t *)
1462 1463 1464 1465 1466
{
    errno = ENOTSUP;
    return -1;
}

1467 1468 1469 1470 1471
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1472
void zmq::socket_base_t::xread_activated (pipe_t *)
1473 1474 1475
{
    zmq_assert (false);
}
1476
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1477 1478 1479 1480
{
    zmq_assert (false);
}

1481
void zmq::socket_base_t::xhiccuped (pipe_t *)
1482
{
1483
    zmq_assert (false);
1484 1485
}

1486 1487
void zmq::socket_base_t::in_event ()
{
1488 1489 1490 1491
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1492
    ENTER_MUTEX ();
1493

1494 1495 1496
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
1497

1498
    process_commands (0, false);
1499
    EXIT_MUTEX ();
1500 1501 1502 1503 1504 1505 1506 1507
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1508
void zmq::socket_base_t::timer_event (int)
1509 1510 1511
{
    zmq_assert (false);
}
1512

1513 1514
void zmq::socket_base_t::check_destroy ()
{
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1542 1543
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1544
    if (options.immediate == 1)
1545 1546 1547 1548
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1549 1550
}

1551
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1552 1553
{
    //  Notify the specific socket type about the pipe termination.
1554
    xpipe_terminated (pipe_);
1555

1556
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1557
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1558
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1559
            inprocs.erase (it);
1560
            break;
1561 1562
        }

1563 1564 1565 1566 1567 1568 1569
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1570 1571
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1572
    //  Test whether IDENTITY flag is valid for this socket type.
1573
    if (unlikely (msg_->flags () & msg_t::identity))
1574
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1575

1576
    //  Remove MORE flag.
1577 1578
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1579

1580
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1581
{
1582 1583 1584 1585
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1586
    //  Support deregistering monitoring endpoints as well
1587 1588 1589 1590 1591 1592 1593
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1594
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1595 1596
        return -1;

1597
    //  Event notification only supported over inproc://
1598 1599 1600 1601
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1602 1603 1604 1605
    // already monitoring. Stop previous monitor before starting new one.
    if (monitor_socket != NULL) {
        stop_monitor (true);
    }
1606
    //  Register events to monitor
1607
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1608
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1609 1610 1611
    if (monitor_socket == NULL)
        return -1;

1612
    //  Never block context termination on pending event messages
1613
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1614
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1615
    if (rc == -1)
1616
        stop_monitor (false);
1617

1618
    //  Spawn the monitor socket endpoint
1619 1620
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
1621
         stop_monitor (false);
1622 1623 1624
    return rc;
}

1625 1626 1627 1628 1629 1630 1631 1632 1633 1634
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
    file_desc = fd_;
}

zmq::fd_t zmq::socket_base_t::fd()
{
    return file_desc;
}

Martin Hurton's avatar
Martin Hurton committed
1635
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1636
{
1637 1638
    if (monitor_events & ZMQ_EVENT_CONNECTED)
        monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1639
}
1640

Martin Hurton's avatar
Martin Hurton committed
1641
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1642
{
1643 1644
    if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
        monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1645
}
1646

Martin Hurton's avatar
Martin Hurton committed
1647
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1648
{
1649 1650
    if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
        monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1651 1652
}

Martin Hurton's avatar
Martin Hurton committed
1653
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1654
{
1655 1656
    if (monitor_events & ZMQ_EVENT_LISTENING)
        monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1657 1658
}

Martin Hurton's avatar
Martin Hurton committed
1659
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1660
{
1661 1662
    if (monitor_events & ZMQ_EVENT_BIND_FAILED)
        monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1663 1664
}

Martin Hurton's avatar
Martin Hurton committed
1665
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1666
{
1667 1668
    if (monitor_events & ZMQ_EVENT_ACCEPTED)
        monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1669 1670
}

Martin Hurton's avatar
Martin Hurton committed
1671
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1672
{
1673 1674
    if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
        monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1675 1676
}

Martin Hurton's avatar
Martin Hurton committed
1677
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1678
{
1679 1680
    if (monitor_events & ZMQ_EVENT_CLOSED)
        monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1681
}
Martin Hurton's avatar
Martin Hurton committed
1682 1683

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1684
{
1685 1686
    if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
        monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1687 1688
}

Martin Hurton's avatar
Martin Hurton committed
1689
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1690
{
1691 1692
    if (monitor_events & ZMQ_EVENT_DISCONNECTED)
        monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1693 1694
}

1695 1696
//  Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1697
{
1698
    if (monitor_socket) {
1699
        //  Send event in first frame
1700
        zmq_msg_t msg;
1701 1702
        zmq_msg_init_size (&msg, 6);
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1703 1704 1705 1706 1707
        //  Avoid dereferencing uint32_t on unaligned address
        uint16_t event = (uint16_t) event_;
        uint32_t value = (uint32_t) value_;
        memcpy (data + 0, &event, sizeof(event));
        memcpy (data + 2, &value, sizeof(value));
1708
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1709 1710

        //  Send address in second frame
1711
        zmq_msg_init_size (&msg, addr_.size());
1712
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1713 1714
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1715 1716
}

1717
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1718 1719
{
    if (monitor_socket) {
1720
        if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1721
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1722 1723 1724 1725
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1726
}