socket_base.cpp 51.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <string>
33 34
#include <algorithm>

35
#include "macros.hpp"
36 37 38

#if defined ZMQ_HAVE_WINDOWS
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46
#else
#include <unistd.h>
47
#include <ctype.h>
48
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "udp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
70 71 72 73 74 75

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

76 77 78
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
79

80 81 82 83 84 85 86
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
87 88
#include "dealer.hpp"
#include "router.hpp"
89 90
#include "xpub.hpp"
#include "xsub.hpp"
91
#include "stream.hpp"
92
#include "server.hpp"
93
#include "client.hpp"
somdoron's avatar
somdoron committed
94 95
#include "radio.hpp"
#include "dish.hpp"
somdoron's avatar
somdoron committed
96 97
#include "gather.hpp"
#include "scatter.hpp"
98
#include "dgram.hpp"
99

somdoron's avatar
somdoron committed
100 101


102 103 104 105 106
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

107
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
108
    uint32_t tid_, int sid_)
109 110 111
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
148 149 150
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
151 152 153
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
154 155 156 157 158 159
        case ZMQ_RADIO:
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
160 161 162 163 164 165
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
166 167 168
        case ZMQ_DGRAM:
            s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
169 170 171
        default:
            errno = EINVAL;
            return NULL;
172
    }
173 174

    alloc_assert (s);
somdoron's avatar
somdoron committed
175

176
    if (s->mailbox == NULL) {
177
        s->destroyed = true;
178
        LIBZMQ_DELETE(s);
Pieter Hintjens's avatar
Pieter Hintjens committed
179
        return NULL;
180
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
181

182 183 184
    return s;
}

somdoron's avatar
somdoron committed
185
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
186
    own_t (parent_, tid_),
187
    tag (0xbaddecaf),
188
    ctx_terminated (false),
189
    destroyed (false),
190
    poller(NULL),
191
    handle((poller_t::handle_t)NULL),
Martin Sustrik's avatar
Martin Sustrik committed
192
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
193
    ticks (0),
194 195
    rcvmore (false),
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
196
    monitor_events (0),
197
    thread_safe (thread_safe_),
198 199 200
    reaper_signaler (NULL),
    sync(),
    monitor_sync()
Martin Sustrik's avatar
Martin Sustrik committed
201
{
202
    options.socket_id = sid_;
203
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
204
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
205 206

    if (thread_safe)
207
    {
208
        mailbox = new (std::nothrow) mailbox_safe_t(&sync);
209 210
        zmq_assert (mailbox);
    }
211
    else {
212
        mailbox_t *m = new (std::nothrow) mailbox_t();
213
        zmq_assert (m);
214

215 216 217 218 219 220 221
        if (m->get_fd () != retired_fd)
            mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            mailbox = NULL;
        }
    }
222 223
}

224 225
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
                                        size_t routing_id_size_) const
226
{
227 228
    LIBZMQ_UNUSED (routing_id_);
    LIBZMQ_UNUSED (routing_id_size_);
229

230 231 232 233 234
    //  Only ROUTER sockets support this
    errno = ENOTSUP;
    return -1;
}

235 236
zmq::socket_base_t::~socket_base_t ()
{
237 238
    if (mailbox)
        LIBZMQ_DELETE(mailbox);
Ilya Kulakov's avatar
Ilya Kulakov committed
239

240
    if (reaper_signaler)
241
        LIBZMQ_DELETE(reaper_signaler);
Ilya Kulakov's avatar
Ilya Kulakov committed
242

243
    scoped_lock_t lock(monitor_sync);
244
    stop_monitor ();
245

246
    zmq_assert (destroyed);
247 248
}

somdoron's avatar
somdoron committed
249
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
250
{
somdoron's avatar
somdoron committed
251
    return mailbox;
252 253 254 255
}

void zmq::socket_base_t::stop ()
{
256 257
    //  Called by ctx when it is terminated (zmq_ctx_term).
    //  'stop' command is sent from the threads that called zmq_ctx_term to
258 259 260 261 262
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

263 264 265 266 267 268 269 270 271 272 273 274 275
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
276

277 278 279 280 281 282 283
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

284 285
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
286
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
287
    if (protocol_ != "inproc"
288
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
Pieter Hintjens's avatar
Pieter Hintjens committed
289
    &&  protocol_ != "ipc"
290
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
291
    &&  protocol_ != "tcp"
292 293
#if defined ZMQ_HAVE_OPENPGM
    //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
Pieter Hintjens's avatar
Pieter Hintjens committed
294 295
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
296 297 298
#endif
#if defined ZMQ_HAVE_TIPC
    // TIPC transport is only available on Linux.
Pieter Hintjens's avatar
Pieter Hintjens committed
299
    &&  protocol_ != "tipc"
300
#endif
301 302
#if defined ZMQ_HAVE_NORM
    &&  protocol_ != "norm"
303
#endif
304 305
#if defined ZMQ_HAVE_VMCI
    &&  protocol_ != "vmci"
306
#endif
307
    &&  protocol_ != "udp") {
Ilya Kulakov's avatar
Ilya Kulakov committed
308 309 310 311
        errno = EPROTONOSUPPORT;
        return -1;
    }

312 313 314
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
315
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
bebopagogo's avatar
bebopagogo committed
316
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
317 318
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
319 320 321
        errno = ENOCOMPATPROTO;
        return -1;
    }
322
#endif
323

324
    if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
325 326
                               options.type != ZMQ_RADIO &&
                               options.type != ZMQ_DGRAM)) {
327
        errno = ENOCOMPATPROTO;
328
        return -1;
329
    }
330

331 332 333 334
    //  Protocol is available.
    return 0;
}

335
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
336
{
337 338 339
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
340

341
    //  Let the derived socket type know about new pipe.
342
    xattach_pipe (pipe_, subscribe_to_all_);
343 344 345 346 347

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
348
        pipe_->terminate (false);
349
    }
350 351
}

352
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
353
    size_t optvallen_)
354
{
355
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
356

357 358 359 360
    if (!options.is_valid(option_)) {
        errno = EINVAL;
        return -1;
    }
361

362
    if (unlikely (ctx_terminated)) {
363 364 365 366
        errno = ETERM;
        return -1;
    }

367 368
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
369
    if (rc == 0 || errno != EINVAL) {
370
        return rc;
somdoron's avatar
somdoron committed
371
    }
372 373 374

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
375
    rc = options.setsockopt (option_, optval_, optvallen_);
376
    update_pipe_options(option_);
somdoron's avatar
somdoron committed
377 378

    return rc;
379 380
}

381 382 383
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
384
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
385

386
    if (unlikely (ctx_terminated)) {
387 388 389 390
        errno = ETERM;
        return -1;
    }

391
    if (option_ == ZMQ_RCVMORE) {
392
        if (*optvallen_ < sizeof (int)) {
393 394 395
            errno = EINVAL;
            return -1;
        }
396
        memset(optval_, 0, *optvallen_);
397 398
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
399 400 401
        return 0;
    }

402 403 404 405 406
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
            return -1;
        }
somdoron's avatar
somdoron committed
407 408 409 410 411 412

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            return -1;
        }
413

somdoron's avatar
somdoron committed
414
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
415 416
        *optvallen_ = sizeof(fd_t);

417 418 419 420
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
421
        if (*optvallen_ < sizeof (int)) {
422 423 424
            errno = EINVAL;
            return -1;
        }
425
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
426
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
427
            return -1;
somdoron's avatar
somdoron committed
428
        }
429
        errno_assert (rc == 0);
430
        *((int*) optval_) = 0;
431
        if (has_out ())
432
            *((int*) optval_) |= ZMQ_POLLOUT;
433
        if (has_in ())
434 435
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
436 437 438
        return 0;
    }

439 440 441 442 443
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
            return -1;
        }
444
        strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
445 446 447 448
        *optvallen_ = last_endpoint.size () + 1;
        return 0;
    }

449 450 451 452 453 454
    if (option_ == ZMQ_THREAD_SAFE) {
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
            return -1;
        }
        memset(optval_, 0, *optvallen_);
455
        *((int*) optval_) = thread_safe ? 1 : 0;
456 457
        *optvallen_ = sizeof (int);
        return 0;
Ilya Kulakov's avatar
Ilya Kulakov committed
458
    }
459

somdoron's avatar
somdoron committed
460 461
    int rc = options.getsockopt (option_, optval_, optvallen_);
    return rc;
462 463
}

somdoron's avatar
somdoron committed
464 465
int zmq::socket_base_t::join (const char* group_)
{
466
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
467 468 469 470 471 472 473 474 475

    int rc = xjoin (group_);


    return rc;
}

int zmq::socket_base_t::leave (const char* group_)
{
476
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
477 478 479 480 481 482 483

    int rc = xleave (group_);


    return rc;
}

484 485
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
486
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
487 488 489

    if (!thread_safe) {
        errno = EINVAL;
Ilya Kulakov's avatar
Ilya Kulakov committed
490
        return -1;
491 492 493 494 495 496 497 498 499
    }

    ((mailbox_safe_t*)mailbox)->add_signaler(s_);

    return 0;
}

int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
500
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
501 502 503 504 505 506 507 508 509 510 511

    if (!thread_safe) {
        errno = EINVAL;
        return -1;
    }

    ((mailbox_safe_t*)mailbox)->remove_signaler(s_);

    return 0;
}

512 513
int zmq::socket_base_t::bind (const char *addr_)
{
514
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
515

516
    if (unlikely (ctx_terminated)) {
517 518 519 520
        errno = ETERM;
        return -1;
    }

521 522
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
523
    if (unlikely (rc != 0)) {
524
        return -1;
somdoron's avatar
somdoron committed
525
    }
526

527
    //  Parse addr_ string.
528 529
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
530
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
531
        return -1;
somdoron's avatar
somdoron committed
532
    }
533

Pieter Hintjens's avatar
Pieter Hintjens committed
534
    if (protocol == "inproc") {
535
        const endpoint_t endpoint = { this, options };
536
        rc = register_endpoint (addr_, endpoint);
537
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
538
            connect_pending (addr_, this);
539
            last_endpoint.assign (addr_);
540
            options.connected = true;
541 542
        }
        return rc;
543
    }
544

545
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
546
        //  For convenience's sake, bind can be used interchangeable with
547
        //  connect for PGM, EPGM, NORM transports.
548 549 550 551
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
552 553
    }

554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    if (protocol == "udp") {
        if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
            errno = ENOCOMPATPROTO;
            return -1;
        }

        //  Choose the I/O thread to run the session in.
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }

        address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
        alloc_assert (paddr);

        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
        rc = paddr->resolved.udp_addr->resolve (address.c_str(), true);
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
            return -1;
        }

        session_base_t *session = session_base_t::create (io_thread, true, this,
            options, paddr);
        errno_assert (session);

        pipe_t *newpipe = NULL;

        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
        pipe_t *new_pipes [2] = {NULL, NULL};

        int hwms [2] = {options.sndhwm, options.rcvhwm};
        bool conflates [2] = {false, false};
        rc = pipepair (parents, new_pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
        attach_pipe (new_pipes [0], true);
        newpipe = new_pipes [0];

        //  Attach remote end of the pipe to the session object later on.
        session->attach_pipe (new_pipes [1]);

        //  Save last endpoint URI
        paddr->to_string (last_endpoint);

        add_endpoint (addr_, (own_t *) session, newpipe);

        return 0;
    }

608
    //  Remaining transports require to be run in an I/O thread, so at this
609 610 611 612 613 614
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }
615

616
    if (protocol == "tcp") {
617
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
618
            io_thread, this, options);
619
        alloc_assert (listener);
620
        rc = listener->set_address (address.c_str ());
621
        if (rc != 0) {
622
            LIBZMQ_DELETE(listener);
623
            event_bind_failed (address, zmq_errno());
624
            return -1;
625
        }
626

627
        // Save last endpoint URI
628
        listener->get_address (last_endpoint);
629

630
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
631
        options.connected = true;
632 633 634
        return 0;
    }

635
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
636 637 638 639
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
640
        int rc = listener->set_address (address.c_str ());
641
        if (rc != 0) {
642
            LIBZMQ_DELETE(listener);
643
            event_bind_failed (address, zmq_errno());
644 645
            return -1;
        }
646

647
        // Save last endpoint URI
648
        listener->get_address (last_endpoint);
649

650
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
651
        options.connected = true;
652
        return 0;
653
    }
654
#endif
655
#if defined ZMQ_HAVE_TIPC
656 657 658 659 660 661
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
662
             LIBZMQ_DELETE(listener);
663 664 665 666 667 668 669 670
             event_bind_failed (address, zmq_errno());
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
671
        options.connected = true;
672 673 674
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
#if defined ZMQ_HAVE_VMCI
    if (protocol == "vmci") {
        vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(listener);
            event_bind_failed (address, zmq_errno ());
            return -1;
        }

        listener->get_address (last_endpoint);

        add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
        options.connected = true;
        return 0;
    }
#endif
694

695
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
696
    return -1;
697 698
}

699
int zmq::socket_base_t::connect (const char *addr_)
700
{
701
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
702

703
    if (unlikely (ctx_terminated)) {
704 705 706 707
        errno = ETERM;
        return -1;
    }

708 709
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
710
    if (unlikely (rc != 0)) {
711
        return -1;
somdoron's avatar
somdoron committed
712
    }
713

malosek's avatar
malosek committed
714
    //  Parse addr_ string.
715 716
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
717
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
718
        return -1;
somdoron's avatar
somdoron committed
719
    }
malosek's avatar
malosek committed
720

Pieter Hintjens's avatar
Pieter Hintjens committed
721
    if (protocol == "inproc") {
722

723 724 725 726
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

727 728
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
729

730
        // The total HWM for an inproc connection should be the sum of
731
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
732
        int sndhwm = 0;
733 734 735
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
736
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
737
        int rcvhwm = 0;
738 739
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
740 741
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
742
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
743

744
        //  Create a bi-directional pipe to connect the peers.
745
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
746
        pipe_t *new_pipes [2] = {NULL, NULL};
747 748 749 750 751 752 753 754 755 756

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
757
        rc = pipepair (parents, new_pipes, hwms, conflates);
758 759 760 761 762
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

763
        errno_assert (rc == 0);
764

765 766
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
767 768
            //  to send the routing id message or not. To resolve this,
            //  we always send our routing id and drop it later if
769 770
            //  the peer doesn't expect it.
            msg_t id;
771
            rc = id.init_size (options.routing_id_size);
772
            errno_assert (rc == 0);
773 774
            memcpy (id.data (), options.routing_id, options.routing_id_size);
            id.set_flags (msg_t::routing_id);
775 776 777 778
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
779 780
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
781
        }
Martin Hurton's avatar
Martin Hurton committed
782
        else {
783 784
            //  If required, send the routing id of the local socket to the peer.
            if (peer.options.recv_routing_id) {
785
                msg_t id;
786
                rc = id.init_size (options.routing_id_size);
787
                errno_assert (rc == 0);
788 789
                memcpy (id.data (), options.routing_id, options.routing_id_size);
                id.set_flags (msg_t::routing_id);
790 791 792 793
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
794

795 796
            //  If required, send the routing id of the peer to the local socket.
            if (options.recv_routing_id) {
797
                msg_t id;
798
                rc = id.init_size (peer.options.routing_id_size);
799
                errno_assert (rc == 0);
800 801
                memcpy (id.data (), peer.options.routing_id, peer.options.routing_id_size);
                id.set_flags (msg_t::routing_id);
802 803 804 805
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
806

807 808 809 810 811
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
812

Martin Hurton's avatar
Martin Hurton committed
813 814 815
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

816
        // Save last endpoint URI
817
        last_endpoint.assign (addr_);
818

819
        // remember inproc connections for disconnect
820
        inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (addr_, new_pipes [0]);
821

822
        options.connected = true;
823 824
        return 0;
    }
825
    bool is_single_connect = (options.type == ZMQ_DEALER ||
826
                              options.type == ZMQ_SUB    || options.type == ZMQ_PUB ||
827 828
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
829
        const endpoints_t::iterator it = endpoints.find (addr_);
830 831 832 833 834 835 836
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
            return 0;
        }
    }
837

838 839 840 841 842 843 844
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

Ilya Kulakov's avatar
Ilya Kulakov committed
845
    address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
846
    alloc_assert (paddr);
847 848 849

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
850 851 852
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
853 854
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
855 856 857 858 859 860
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
861
        if (isalnum (*check) || isxdigit (*check) || *check == '[' || *check == ':') {
862 863 864
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
865
                || *check == '.' || *check == '-' || *check == ':' || *check == '%'
866
                || *check == ';' || *check == '['  || *check == ']' || *check == '_'
867
                || *check == '*'
868
            ) {
869
                check++;
870
            }
871 872 873 874 875 876 877 878 879 880 881 882 883 884
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
885
            errno = EINVAL;
886
            LIBZMQ_DELETE(paddr);
887 888 889
            return -1;
        }
        //  Defer resolution until a socket is opened
890
        paddr->resolved.tcp_addr = NULL;
891
    }
892
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
893 894
    else
    if (protocol == "ipc") {
895
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
896
        alloc_assert (paddr->resolved.ipc_addr);
897 898
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
899
            LIBZMQ_DELETE(paddr);
900 901 902
            return -1;
        }
    }
903
#endif
904

905 906 907 908 909 910
    if (protocol  == "udp") {
        if (options.type != ZMQ_RADIO) {
            errno = ENOCOMPATPROTO;
            LIBZMQ_DELETE(paddr);
            return -1;
        }
911

912 913 914 915 916 917 918
        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
        rc = paddr->resolved.udp_addr->resolve (address.c_str(), false);
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
            return -1;
        }
919 920
    }

bebopagogo's avatar
bebopagogo committed
921
// TBD - Should we check address for ZMQ_HAVE_NORM???
922

923 924 925 926 927 928 929
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
930 931 932
        if (rc != 0 || port_number == 0) {
          return -1;
        }
933
    }
934
#endif
935
#if defined ZMQ_HAVE_TIPC
936 937 938 939 940 941
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
942
            LIBZMQ_DELETE(paddr);
943 944 945 946
            return -1;
        }
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
947 948 949 950 951 952 953 954 955 956 957 958
#if defined ZMQ_HAVE_VMCI
    else
    if (protocol == "vmci") {
        paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
            return -1;
        }
    }
#endif
959

960
    //  Create session.
961
    session_base_t *session = session_base_t::create (io_thread, true, this,
962
        options, paddr);
963
    errno_assert (session);
964

965
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
966
    //  sent to this pipe. (same for NORM, currently?)
967
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
968
    pipe_t *newpipe = NULL;
969

970
    if (options.immediate != 1 || subscribe_to_all) {
971 972
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
973
        pipe_t *new_pipes [2] = {NULL, NULL};
974 975 976 977 978 979 980 981 982 983 984

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
985
        rc = pipepair (parents, new_pipes, hwms, conflates);
986
        errno_assert (rc == 0);
987

988
        //  Attach local end of the pipe to the socket object.
989
        attach_pipe (new_pipes [0], subscribe_to_all);
990
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
991

992
        //  Attach remote end of the pipe to the session object later on.
993
        session->attach_pipe (new_pipes [1]);
994 995 996
    }

    //  Save last endpoint URI
997
    paddr->to_string (last_endpoint);
998

999
    add_endpoint (addr_, (own_t *) session, newpipe);
1000 1001 1002
    return 0;
}

1003
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
1004
{
1005
    //  Activate the session. Make it a child of this socket.
1006
    launch_child (endpoint_);
1007
    endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (addr_, endpoint_pipe_t (endpoint_, pipe));
1008 1009 1010 1011
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
1012
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1013

1014 1015 1016 1017 1018
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
malosek's avatar
malosek committed
1019

1020
    //  Check whether endpoint address passed to the function is valid.
1021 1022 1023 1024 1025
    if (unlikely (!addr_)) {
        errno = EINVAL;
        return -1;
    }

1026 1027 1028
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
1029
    if (unlikely(rc != 0)) {
1030
        return -1;
somdoron's avatar
somdoron committed
1031
    }
1032

1033 1034 1035
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
1036
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
1037
        return -1;
somdoron's avatar
somdoron committed
1038
    }
1039 1040 1041

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
1042
        if (unregister_endpoint (std::string(addr_), this) == 0) {
Martin Hurton's avatar
Martin Hurton committed
1043
            return 0;
somdoron's avatar
somdoron committed
1044
        }
1045 1046 1047 1048 1049
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
1050

1051
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
1052
            it->second->terminate (true);
1053 1054 1055 1056
        inprocs.erase (range.first, range.second);
        return 0;
    }

1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    std::string resolved_addr = std::string (addr_);
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range;

    // The resolved last_endpoint is used as a key in the endpoints map.
    // The address passed by the user might not match in the TCP case due to
    // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
    // resolve before giving up. Given at this stage we don't know whether a
    // socket is connected or bound, try with both.
    if (protocol == "tcp") {
        range = endpoints.equal_range (resolved_addr);
        if (range.first == range.second) {
            tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
            alloc_assert (tcp_addr);
            rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);

            if (rc == 0) {
                tcp_addr->to_string (resolved_addr);
                range = endpoints.equal_range (resolved_addr);

                if (range.first == range.second) {
                    rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
                    if (rc == 0) {
                        tcp_addr->to_string (resolved_addr);
                    }
                }
            }
            LIBZMQ_DELETE(tcp_addr);
        }
    }

1087
    //  Find the endpoints range (if any) corresponding to the addr_ string.
1088
    range = endpoints.equal_range (resolved_addr);
1089 1090
    if (range.first == range.second) {
        errno = ENOENT;
1091
        return -1;
1092
    }
1093

1094 1095 1096
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1097
            it->second.second->terminate (false);
1098
        term_child (it->second.first);
1099
    }
1100
    endpoints.erase (range.first, range.second);
1101
    return 0;
1102 1103
}

1104
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1105
{
1106
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1107

1108
    //  Check whether the library haven't been shut down yet.
1109
    if (unlikely (ctx_terminated)) {
1110 1111 1112 1113
        errno = ETERM;
        return -1;
    }

1114
    //  Check whether message passed to the function is valid.
1115
    if (unlikely (!msg_ || !msg_->check ())) {
1116 1117 1118 1119
        errno = EFAULT;
        return -1;
    }

1120
    //  Process pending commands, if any.
1121
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1122
    if (unlikely (rc != 0)) {
1123
        return -1;
somdoron's avatar
somdoron committed
1124
    }
1125

1126 1127 1128
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1129
    //  At this point we impose the flags on the message.
1130
    if (flags_ & ZMQ_SNDMORE)
1131
        msg_->set_flags (msg_t::more);
1132

1133 1134
    msg_->reset_metadata ();

1135
    //  Try to send the message using method in each socket class
1136
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1137
    if (rc == 0) {
1138
        return 0;
somdoron's avatar
somdoron committed
1139
    }
1140
    if (unlikely (errno != EAGAIN)) {
1141
        return -1;
somdoron's avatar
somdoron committed
1142
    }
Martin Sustrik's avatar
Martin Sustrik committed
1143

1144
    //  In case of non-blocking send we'll simply propagate
1145
    //  the error - including EAGAIN - up the stack.
1146
    if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
Martin Sustrik's avatar
Martin Sustrik committed
1147
        return -1;
somdoron's avatar
somdoron committed
1148
    }
Martin Sustrik's avatar
Martin Sustrik committed
1149

1150
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1151
    //  If the timeout is infinite, don't care.
1152 1153 1154
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1155 1156
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1157 1158
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1159
        if (unlikely (process_commands (timeout, false) != 0)) {
1160
            return -1;
1161
        }
1162
        rc = xsend (msg_);
1163 1164
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1165
        if (unlikely (errno != EAGAIN)) {
1166
            return -1;
somdoron's avatar
somdoron committed
1167
        }
1168
        if (timeout > 0) {
1169
            timeout = (int) (end - clock.now_ms ());
1170 1171 1172 1173 1174
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1175
    }
somdoron's avatar
somdoron committed
1176

Martin Sustrik's avatar
Martin Sustrik committed
1177
    return 0;
1178 1179
}

1180
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1181
{
1182
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1183

1184
    //  Check whether the library haven't been shut down yet.
1185
    if (unlikely (ctx_terminated)) {
1186 1187 1188 1189
        errno = ETERM;
        return -1;
    }

1190
    //  Check whether message passed to the function is valid.
1191
    if (unlikely (!msg_ || !msg_->check ())) {
1192 1193 1194 1195
        errno = EFAULT;
        return -1;
    }

1196 1197 1198 1199 1200 1201 1202
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1203
    //  ticks is more efficient than doing RDTSC all the time.
1204
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1205
        if (unlikely (process_commands (0, false) != 0)) {
1206
            return -1;
somdoron's avatar
somdoron committed
1207
        }
1208 1209 1210
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1211
    //  Get the message.
1212
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1213
    if (unlikely (rc != 0 && errno != EAGAIN)) {
Martin Hurton's avatar
Martin Hurton committed
1214
        return -1;
somdoron's avatar
somdoron committed
1215
    }
Martin Hurton's avatar
Martin Hurton committed
1216

1217
    //  If we have the message, return immediately.
1218
    if (rc == 0) {
1219
        extract_flags (msg_);
1220
        return 0;
1221
    }
1222

Martin Sustrik's avatar
Martin Sustrik committed
1223
    //  If the message cannot be fetched immediately, there are two scenarios.
1224
    //  For non-blocking recv, commands are processed in case there's an
Patrik Wenger's avatar
Patrik Wenger committed
1225
    //  activate_reader command already waiting in a command pipe.
1226
    //  If it's not, return EAGAIN.
1227
    if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1228
        if (unlikely (process_commands (0, false) != 0)) {
1229
            return -1;
somdoron's avatar
somdoron committed
1230
        }
1231
        ticks = 0;
1232

1233
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1234
        if (rc < 0) {
1235
            return rc;
somdoron's avatar
somdoron committed
1236
        }
1237
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1238

1239
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1240 1241
    }

1242
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1243
    //  If the timeout is infinite, don't care.
1244 1245 1246
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1247 1248
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1249
    bool block = (ticks != 0);
1250
    while (true) {
somdoron's avatar
somdoron committed
1251
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1252
            return -1;
somdoron's avatar
somdoron committed
1253
        }
1254
        rc = xrecv (msg_);
1255 1256 1257 1258
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1259
        if (unlikely (errno != EAGAIN)) {
1260
            return -1;
somdoron's avatar
somdoron committed
1261
        }
1262
        block = true;
1263
        if (timeout > 0) {
1264
            timeout = (int) (end - clock.now_ms ());
1265 1266 1267 1268 1269
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1270
    }
1271

1272
    extract_flags (msg_);
1273
    return 0;
1274 1275 1276 1277
}

int zmq::socket_base_t::close ()
{
1278
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
1279

1280 1281 1282 1283
    //  Remove all existing signalers for thread safe sockets
    if (thread_safe)
        ((mailbox_safe_t*)mailbox)->clear_signalers();

1284 1285
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1286

1287

1288 1289 1290 1291
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1292

1293 1294 1295
    return 0;
}

1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1306
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1307
{
1308
    //  Plug the socket to the reaper thread.
1309
    poller = poller_;
somdoron's avatar
somdoron committed
1310 1311 1312 1313 1314

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
1315
    else {
1316
        scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1317

1318
        reaper_signaler = new (std::nothrow) signaler_t();
1319
        zmq_assert (reaper_signaler);
1320

somdoron's avatar
somdoron committed
1321
        //  Add signaler to the safe mailbox
1322
        fd = reaper_signaler->get_fd();
1323
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1324 1325

        //  Send a signal to make sure reaper handle existing commands
1326
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1327 1328 1329 1330

    }

    handle = poller->add_fd (fd, this);
1331
    poller->set_pollin (handle);
1332 1333 1334 1335 1336

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1337 1338
}

1339
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1340
{
1341
    int rc;
1342
    command_t cmd;
1343 1344 1345
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1346
        rc = mailbox->recv (&cmd, timeout_);
1347 1348
    }
    else {
1349

1350 1351 1352
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1353
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1354
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1355

1356 1357 1358 1359 1360 1361
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1362 1363
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1364 1365 1366
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1367
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1368
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1369
            last_tsc = tsc;
1370 1371 1372
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1373
        rc = mailbox->recv (&cmd, 0);
1374
    }
Martin Sustrik's avatar
Martin Sustrik committed
1375

1376 1377
    //  Process all available commands.
    while (rc == 0) {
1378
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1379
        rc = mailbox->recv (&cmd, 0);
1380 1381 1382 1383 1384 1385
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1386

1387
    if (ctx_terminated) {
1388 1389
        errno = ETERM;
        return -1;
1390
    }
1391 1392

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1393 1394
}

1395
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1396
{
1397
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1398
    //  We'll remember the fact so that any blocking call is interrupted and any
1399 1400
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1401 1402 1403
    scoped_lock_t lock(monitor_sync);
    stop_monitor ();
    
1404
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1405 1406
}

1407
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1408
{
1409
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1410 1411
}

1412
void zmq::socket_base_t::process_term (int linger_)
1413
{
1414 1415 1416 1417 1418
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1419 1420
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1421
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1422
    register_term_acks ((int) pipes.size ());
1423

1424
    //  Continue the termination process immediately.
1425
    own_t::process_term (linger_);
1426 1427
}

1428 1429 1430 1431 1432 1433
void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
{
    term_endpoint (endpoint_->c_str());
    delete endpoint_;
}

1434 1435
void zmq::socket_base_t::update_pipe_options(int option_)
{
1436 1437 1438 1439 1440
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    {
        for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
        {
            pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
1441
            pipes[i]->send_hwms_to_peer(options.sndhwm, options.rcvhwm);
1442 1443
        }
    }
1444 1445 1446

}

1447 1448 1449 1450 1451
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1452
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1453 1454 1455 1456
{
    errno = EINVAL;
    return -1;
}
1457 1458 1459 1460 1461 1462

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1463
int zmq::socket_base_t::xsend (msg_t *)
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

somdoron's avatar
somdoron committed
1474 1475
int zmq::socket_base_t::xjoin (const char *group_)
{
1476
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1477 1478 1479 1480 1481 1482
    errno = ENOTSUP;
    return -1;
}

int zmq::socket_base_t::xleave (const char *group_)
{
1483
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1484 1485 1486 1487
    errno = ENOTSUP;
    return -1;
}

1488
int zmq::socket_base_t::xrecv (msg_t *)
1489 1490 1491 1492 1493
{
    errno = ENOTSUP;
    return -1;
}

1494 1495 1496
static const zmq::blob_t empty_blob;

const zmq::blob_t &zmq::socket_base_t::get_credential () const
1497
{
1498
    return empty_blob;
1499 1500
}

1501
void zmq::socket_base_t::xread_activated (pipe_t *)
1502 1503 1504
{
    zmq_assert (false);
}
1505
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1506 1507 1508 1509
{
    zmq_assert (false);
}

1510
void zmq::socket_base_t::xhiccuped (pipe_t *)
1511
{
1512
    zmq_assert (false);
1513 1514
}

1515 1516
void zmq::socket_base_t::in_event ()
{
1517 1518 1519 1520
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1521 1522
  {
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
1523

1524 1525 1526
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
1527

1528
    process_commands (0, false);
1529
  }
1530
    check_destroy();
1531 1532 1533 1534 1535 1536 1537
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1538
void zmq::socket_base_t::timer_event (int)
1539 1540 1541
{
    zmq_assert (false);
}
1542

1543 1544
void zmq::socket_base_t::check_destroy ()
{
1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1572 1573
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1574
    if (options.immediate == 1)
1575 1576 1577 1578
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1579 1580
}

1581
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1582 1583
{
    //  Notify the specific socket type about the pipe termination.
1584
    xpipe_terminated (pipe_);
1585

1586
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1587
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1588
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1589
            inprocs.erase (it);
1590
            break;
1591 1592
        }

1593 1594 1595 1596 1597 1598 1599
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1600 1601
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1602 1603 1604
    //  Test whether routing_id flag is valid for this socket type.
    if (unlikely (msg_->flags () & msg_t::routing_id))
        zmq_assert (options.recv_routing_id);
Martin Hurton's avatar
Martin Hurton committed
1605

1606
    //  Remove MORE flag.
1607 1608
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1609

1610
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1611
{
1612 1613
    scoped_lock_t lock(monitor_sync);
    
1614 1615 1616 1617
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1618

1619
    //  Support deregistering monitoring endpoints as well
1620 1621 1622 1623 1624 1625 1626
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1627
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1628 1629
        return -1;

1630
    //  Event notification only supported over inproc://
1631 1632 1633 1634
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1635 1636 1637 1638
    // already monitoring. Stop previous monitor before starting new one.
    if (monitor_socket != NULL) {
        stop_monitor (true);
    }
1639
    //  Register events to monitor
1640
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1641
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1642 1643 1644
    if (monitor_socket == NULL)
        return -1;

1645
    //  Never block context termination on pending event messages
1646
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1647
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1648
    if (rc == -1)
1649
        stop_monitor (false);
1650

1651
    //  Spawn the monitor socket endpoint
1652 1653
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
1654
         stop_monitor (false);
1655 1656 1657
    return rc;
}

1658
void zmq::socket_base_t::event_connected (const std::string &addr_, zmq::fd_t fd_)
1659
{
1660
    event(addr_, fd_, ZMQ_EVENT_CONNECTED);
1661
}
1662

Martin Hurton's avatar
Martin Hurton committed
1663
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1664
{
1665
    event(addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
1666
}
1667

Martin Hurton's avatar
Martin Hurton committed
1668
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1669
{
1670
    event(addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
1671 1672
}

1673
void zmq::socket_base_t::event_listening (const std::string &addr_, zmq::fd_t fd_)
1674
{
1675
    event(addr_, fd_, ZMQ_EVENT_LISTENING);
1676 1677
}

Martin Hurton's avatar
Martin Hurton committed
1678
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1679
{
1680
    event(addr_, err_, ZMQ_EVENT_BIND_FAILED);
1681 1682
}

1683
void zmq::socket_base_t::event_accepted (const std::string &addr_, zmq::fd_t fd_)
1684
{
1685
    event(addr_, fd_, ZMQ_EVENT_ACCEPTED);
1686 1687
}

Martin Hurton's avatar
Martin Hurton committed
1688
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1689
{
1690
    event(addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
1691 1692
}

1693
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_)
1694
{
1695
    event(addr_, fd_, ZMQ_EVENT_CLOSED);
1696
}
Martin Hurton's avatar
Martin Hurton committed
1697 1698

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1699
{
1700
    event(addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
1701 1702
}

1703
void zmq::socket_base_t::event_disconnected (const std::string &addr_, zmq::fd_t fd_)
1704
{
1705 1706 1707
    event(addr_, fd_, ZMQ_EVENT_DISCONNECTED);
}

1708 1709
void zmq::socket_base_t::event_handshake_failed_no_detail (
  const std::string &addr_, int err_)
Vincent Tellier's avatar
Vincent Tellier committed
1710
{
1711
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
Vincent Tellier's avatar
Vincent Tellier committed
1712 1713
}

1714 1715
void zmq::socket_base_t::event_handshake_failed_protocol (
  const std::string &addr_, int err_)
1716
{
1717
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1718 1719
}

1720 1721
void zmq::socket_base_t::event_handshake_failed_auth (const std::string &addr_,
                                                      int err_)
1722
{
1723
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1724 1725 1726 1727 1728 1729
}

void zmq::socket_base_t::event_handshake_succeeded (const std::string &addr_,
                                                    int err_)
{
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
Vincent Tellier's avatar
Vincent Tellier committed
1730 1731
}

1732
void zmq::socket_base_t::event(const std::string &addr_, intptr_t value_, int type_)
1733 1734 1735 1736
{
    scoped_lock_t lock(monitor_sync);
    if (monitor_events & type_)
    {
1737
        monitor_event (type_, value_, addr_);
1738
    }
1739 1740
}

1741
//  Send a monitor event
1742
void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std::string &addr_)
1743
{
1744 1745 1746
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1747
    if (monitor_socket) {
1748
        //  Send event in first frame
1749
        zmq_msg_t msg;
1750 1751
        zmq_msg_init_size (&msg, 6);
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1752 1753 1754 1755 1756
        //  Avoid dereferencing uint32_t on unaligned address
        uint16_t event = (uint16_t) event_;
        uint32_t value = (uint32_t) value_;
        memcpy (data + 0, &event, sizeof(event));
        memcpy (data + 2, &value, sizeof(value));
1757
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1758 1759

        //  Send address in second frame
1760
        zmq_msg_init_size (&msg, addr_.size());
1761
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1762 1763
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1764 1765
}

1766
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1767
{
1768 1769 1770
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1771
    if (monitor_socket) {
1772
        if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1773
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1774 1775 1776 1777
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1778
}