socket_base.cpp 50.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <string>
33 34
#include <algorithm>

35
#include "macros.hpp"
36 37 38

#if defined ZMQ_HAVE_WINDOWS
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46
#else
#include <unistd.h>
47
#include <ctype.h>
48
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "udp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
70 71 72 73 74 75

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

76 77 78
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
79

80 81 82 83 84 85 86
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
87 88
#include "dealer.hpp"
#include "router.hpp"
89 90
#include "xpub.hpp"
#include "xsub.hpp"
91
#include "stream.hpp"
92
#include "server.hpp"
93
#include "client.hpp"
somdoron's avatar
somdoron committed
94 95
#include "radio.hpp"
#include "dish.hpp"
somdoron's avatar
somdoron committed
96 97
#include "gather.hpp"
#include "scatter.hpp"
98
#include "dgram.hpp"
99

somdoron's avatar
somdoron committed
100 101


102 103 104 105 106
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

107
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
108
    uint32_t tid_, int sid_)
109 110 111
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
148 149 150
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
151 152 153
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
154 155 156 157 158 159
        case ZMQ_RADIO:
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
            break;
somdoron's avatar
somdoron committed
160 161 162 163 164 165
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
166 167 168
        case ZMQ_DGRAM:
            s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
169 170 171
        default:
            errno = EINVAL;
            return NULL;
172
    }
173 174

    alloc_assert (s);
somdoron's avatar
somdoron committed
175

176
    if (s->mailbox == NULL) {
177
        s->destroyed = true;
178
        LIBZMQ_DELETE(s);
Pieter Hintjens's avatar
Pieter Hintjens committed
179
        return NULL;
180
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
181

182 183 184
    return s;
}

somdoron's avatar
somdoron committed
185
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
186
    own_t (parent_, tid_),
187
    tag (0xbaddecaf),
188
    ctx_terminated (false),
189
    destroyed (false),
190
    poller(NULL),
191
    handle((poller_t::handle_t)NULL),
Martin Sustrik's avatar
Martin Sustrik committed
192
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
193
    ticks (0),
194 195
    rcvmore (false),
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
196
    monitor_events (0),
197
    thread_safe (thread_safe_),
198 199 200
    reaper_signaler (NULL),
    sync(),
    monitor_sync()
Martin Sustrik's avatar
Martin Sustrik committed
201
{
202
    options.socket_id = sid_;
203
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
204
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
205 206

    if (thread_safe)
207
    {
208
        mailbox = new (std::nothrow) mailbox_safe_t(&sync);
209 210
        zmq_assert (mailbox);
    }
211
    else {
212
        mailbox_t *m = new (std::nothrow) mailbox_t();
213
        zmq_assert (m);
214

215 216 217 218 219 220 221
        if (m->get_fd () != retired_fd)
            mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            mailbox = NULL;
        }
    }
222 223 224 225
}

zmq::socket_base_t::~socket_base_t ()
{
226 227
    if (mailbox)
        LIBZMQ_DELETE(mailbox);
Ilya Kulakov's avatar
Ilya Kulakov committed
228

229
    if (reaper_signaler)
230
        LIBZMQ_DELETE(reaper_signaler);
Ilya Kulakov's avatar
Ilya Kulakov committed
231

232
    scoped_lock_t lock(monitor_sync);
233
    stop_monitor ();
234

235
    zmq_assert (destroyed);
236 237
}

somdoron's avatar
somdoron committed
238
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
239
{
somdoron's avatar
somdoron committed
240
    return mailbox;
241 242 243 244
}

void zmq::socket_base_t::stop ()
{
245 246
    //  Called by ctx when it is terminated (zmq_ctx_term).
    //  'stop' command is sent from the threads that called zmq_ctx_term to
247 248 249 250 251
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

252 253 254 255 256 257 258 259 260 261 262 263 264
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
265

266 267 268 269 270 271 272
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

273 274
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
275
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
276
    if (protocol_ != "inproc"
277
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
Pieter Hintjens's avatar
Pieter Hintjens committed
278
    &&  protocol_ != "ipc"
279
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
280
    &&  protocol_ != "tcp"
281 282
#if defined ZMQ_HAVE_OPENPGM
    //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
Pieter Hintjens's avatar
Pieter Hintjens committed
283 284
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
285 286 287
#endif
#if defined ZMQ_HAVE_TIPC
    // TIPC transport is only available on Linux.
Pieter Hintjens's avatar
Pieter Hintjens committed
288
    &&  protocol_ != "tipc"
289
#endif
290 291
#if defined ZMQ_HAVE_NORM
    &&  protocol_ != "norm"
292
#endif
293 294
#if defined ZMQ_HAVE_VMCI
    &&  protocol_ != "vmci"
295
#endif
296
    &&  protocol_ != "udp") {
Ilya Kulakov's avatar
Ilya Kulakov committed
297 298 299 300
        errno = EPROTONOSUPPORT;
        return -1;
    }

301 302 303
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
304
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
bebopagogo's avatar
bebopagogo committed
305
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
306 307
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
308 309 310
        errno = ENOCOMPATPROTO;
        return -1;
    }
311
#endif
312

313
    if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
314 315
                               options.type != ZMQ_RADIO &&
                               options.type != ZMQ_DGRAM)) {
316
        errno = ENOCOMPATPROTO;
317
        return -1;
318
    }
319

320 321 322 323
    //  Protocol is available.
    return 0;
}

324
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
325
{
326 327 328
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
329

330
    //  Let the derived socket type know about new pipe.
331
    xattach_pipe (pipe_, subscribe_to_all_);
332 333 334 335 336

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
337
        pipe_->terminate (false);
338
    }
339 340
}

341
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
342
    size_t optvallen_)
343
{
344
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
345

346 347 348 349
    if (!options.is_valid(option_)) {
        errno = EINVAL;
        return -1;
    }
350

351
    if (unlikely (ctx_terminated)) {
352 353 354 355
        errno = ETERM;
        return -1;
    }

356 357
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
358
    if (rc == 0 || errno != EINVAL) {
359
        return rc;
somdoron's avatar
somdoron committed
360
    }
361 362 363

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
364
    rc = options.setsockopt (option_, optval_, optvallen_);
365
    update_pipe_options(option_);
somdoron's avatar
somdoron committed
366 367

    return rc;
368 369
}

370 371 372
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
373
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
374

375
    if (unlikely (ctx_terminated)) {
376 377 378 379
        errno = ETERM;
        return -1;
    }

380
    if (option_ == ZMQ_RCVMORE) {
381
        if (*optvallen_ < sizeof (int)) {
382 383 384
            errno = EINVAL;
            return -1;
        }
385
        memset(optval_, 0, *optvallen_);
386 387
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
388 389 390
        return 0;
    }

391 392 393 394 395
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
            return -1;
        }
somdoron's avatar
somdoron committed
396 397 398 399 400 401

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            return -1;
        }
402

somdoron's avatar
somdoron committed
403
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
404 405
        *optvallen_ = sizeof(fd_t);

406 407 408 409
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
410
        if (*optvallen_ < sizeof (int)) {
411 412 413
            errno = EINVAL;
            return -1;
        }
414
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
415
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
416
            return -1;
somdoron's avatar
somdoron committed
417
        }
418
        errno_assert (rc == 0);
419
        *((int*) optval_) = 0;
420
        if (has_out ())
421
            *((int*) optval_) |= ZMQ_POLLOUT;
422
        if (has_in ())
423 424
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
425 426 427
        return 0;
    }

428 429 430 431 432
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
            return -1;
        }
433
        strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
434 435 436 437
        *optvallen_ = last_endpoint.size () + 1;
        return 0;
    }

438 439 440 441 442 443
    if (option_ == ZMQ_THREAD_SAFE) {
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
            return -1;
        }
        memset(optval_, 0, *optvallen_);
444
        *((int*) optval_) = thread_safe ? 1 : 0;
445 446
        *optvallen_ = sizeof (int);
        return 0;
Ilya Kulakov's avatar
Ilya Kulakov committed
447
    }
448

somdoron's avatar
somdoron committed
449 450
    int rc = options.getsockopt (option_, optval_, optvallen_);
    return rc;
451 452
}

somdoron's avatar
somdoron committed
453 454
int zmq::socket_base_t::join (const char* group_)
{
455
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
456 457 458 459 460 461 462 463 464

    int rc = xjoin (group_);


    return rc;
}

int zmq::socket_base_t::leave (const char* group_)
{
465
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
466 467 468 469 470 471 472

    int rc = xleave (group_);


    return rc;
}

473 474
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
475
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
476 477 478

    if (!thread_safe) {
        errno = EINVAL;
Ilya Kulakov's avatar
Ilya Kulakov committed
479
        return -1;
480 481 482 483 484 485 486 487 488
    }

    ((mailbox_safe_t*)mailbox)->add_signaler(s_);

    return 0;
}

int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
489
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
490 491 492 493 494 495 496 497 498 499 500

    if (!thread_safe) {
        errno = EINVAL;
        return -1;
    }

    ((mailbox_safe_t*)mailbox)->remove_signaler(s_);

    return 0;
}

501 502
int zmq::socket_base_t::bind (const char *addr_)
{
503
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
504

505
    if (unlikely (ctx_terminated)) {
506 507 508 509
        errno = ETERM;
        return -1;
    }

510 511
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
512
    if (unlikely (rc != 0)) {
513
        return -1;
somdoron's avatar
somdoron committed
514
    }
515

516
    //  Parse addr_ string.
517 518
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
519
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
520
        return -1;
somdoron's avatar
somdoron committed
521
    }
522

Pieter Hintjens's avatar
Pieter Hintjens committed
523
    if (protocol == "inproc") {
524
        const endpoint_t endpoint = { this, options };
525
        rc = register_endpoint (addr_, endpoint);
526
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
527
            connect_pending (addr_, this);
528
            last_endpoint.assign (addr_);
529
            options.connected = true;
530 531
        }
        return rc;
532
    }
533

534
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
535
        //  For convenience's sake, bind can be used interchangeable with
536
        //  connect for PGM, EPGM, NORM transports.
537 538 539 540
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
541 542
    }

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
    if (protocol == "udp") {
        if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
            errno = ENOCOMPATPROTO;
            return -1;
        }

        //  Choose the I/O thread to run the session in.
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }

        address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
        alloc_assert (paddr);

        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
        alloc_assert (paddr->resolved.udp_addr);
        rc = paddr->resolved.udp_addr->resolve (address.c_str(), true);
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
            return -1;
        }

        session_base_t *session = session_base_t::create (io_thread, true, this,
            options, paddr);
        errno_assert (session);

        pipe_t *newpipe = NULL;

        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
        pipe_t *new_pipes [2] = {NULL, NULL};

        int hwms [2] = {options.sndhwm, options.rcvhwm};
        bool conflates [2] = {false, false};
        rc = pipepair (parents, new_pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
        attach_pipe (new_pipes [0], true);
        newpipe = new_pipes [0];

        //  Attach remote end of the pipe to the session object later on.
        session->attach_pipe (new_pipes [1]);

        //  Save last endpoint URI
        paddr->to_string (last_endpoint);

        add_endpoint (addr_, (own_t *) session, newpipe);

        return 0;
    }

597
    //  Remaining transports require to be run in an I/O thread, so at this
598 599 600 601 602 603
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }
604

605
    if (protocol == "tcp") {
606
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
607
            io_thread, this, options);
608
        alloc_assert (listener);
609
        rc = listener->set_address (address.c_str ());
610
        if (rc != 0) {
611
            LIBZMQ_DELETE(listener);
612
            event_bind_failed (address, zmq_errno());
613
            return -1;
614
        }
615

616
        // Save last endpoint URI
617
        listener->get_address (last_endpoint);
618

619
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
620
        options.connected = true;
621 622 623
        return 0;
    }

624
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
625 626 627 628
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
629
        int rc = listener->set_address (address.c_str ());
630
        if (rc != 0) {
631
            LIBZMQ_DELETE(listener);
632
            event_bind_failed (address, zmq_errno());
633 634
            return -1;
        }
635

636
        // Save last endpoint URI
637
        listener->get_address (last_endpoint);
638

639
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
640
        options.connected = true;
641
        return 0;
642
    }
643
#endif
644
#if defined ZMQ_HAVE_TIPC
645 646 647 648 649 650
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
651
             LIBZMQ_DELETE(listener);
652 653 654 655 656 657 658 659
             event_bind_failed (address, zmq_errno());
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
660
        options.connected = true;
661 662 663
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
#if defined ZMQ_HAVE_VMCI
    if (protocol == "vmci") {
        vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(listener);
            event_bind_failed (address, zmq_errno ());
            return -1;
        }

        listener->get_address (last_endpoint);

        add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
        options.connected = true;
        return 0;
    }
#endif
683

684
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
685
    return -1;
686 687
}

688
int zmq::socket_base_t::connect (const char *addr_)
689
{
690
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
691

692
    if (unlikely (ctx_terminated)) {
693 694 695 696
        errno = ETERM;
        return -1;
    }

697 698
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
699
    if (unlikely (rc != 0)) {
700
        return -1;
somdoron's avatar
somdoron committed
701
    }
702

malosek's avatar
malosek committed
703
    //  Parse addr_ string.
704 705
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
706
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
707
        return -1;
somdoron's avatar
somdoron committed
708
    }
malosek's avatar
malosek committed
709

Pieter Hintjens's avatar
Pieter Hintjens committed
710
    if (protocol == "inproc") {
711

712 713 714 715
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

716 717
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
718

719
        // The total HWM for an inproc connection should be the sum of
720
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
721
        int sndhwm = 0;
722 723 724
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
725
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
726
        int rcvhwm = 0;
727 728
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
729 730
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
731
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
732

733
        //  Create a bi-directional pipe to connect the peers.
734
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
735
        pipe_t *new_pipes [2] = {NULL, NULL};
736 737 738 739 740 741 742 743 744 745

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
746
        rc = pipepair (parents, new_pipes, hwms, conflates);
747 748 749 750 751
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

752
        errno_assert (rc == 0);
753

754 755 756 757 758 759 760 761 762 763 764 765 766 767
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
768 769
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
770
        }
Martin Hurton's avatar
Martin Hurton committed
771
        else {
772 773 774 775 776 777 778 779 780 781 782
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
783

784 785 786 787 788 789 790 791 792 793 794
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
795

796 797 798 799 800
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
801

Martin Hurton's avatar
Martin Hurton committed
802 803 804
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

805
        // Save last endpoint URI
806
        last_endpoint.assign (addr_);
807

808
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
809
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
810

811
        options.connected = true;
812 813
        return 0;
    }
814 815 816 817
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
818
        const endpoints_t::iterator it = endpoints.find (addr_);
819 820 821 822 823 824 825
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
            return 0;
        }
    }
826

827 828 829 830 831 832 833
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

Ilya Kulakov's avatar
Ilya Kulakov committed
834
    address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
835
    alloc_assert (paddr);
836 837 838

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
839 840 841
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
842 843
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
844 845 846 847 848 849
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
850
        if (isalnum (*check) || isxdigit (*check) || *check == '[' || *check == ':') {
851 852 853
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
854
                || *check == '.' || *check == '-' || *check == ':' || *check == '%'
855
                || *check == ';' || *check == '['  || *check == ']' || *check == '_'
856
                || *check == '*'
857
            ) {
858
                check++;
859
            }
860 861 862 863 864 865 866 867 868 869 870 871 872 873
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
874
            errno = EINVAL;
875
            LIBZMQ_DELETE(paddr);
876 877 878
            return -1;
        }
        //  Defer resolution until a socket is opened
879
        paddr->resolved.tcp_addr = NULL;
880
    }
881
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
882 883
    else
    if (protocol == "ipc") {
884
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
885
        alloc_assert (paddr->resolved.ipc_addr);
886 887
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
888
            LIBZMQ_DELETE(paddr);
889 890 891
            return -1;
        }
    }
892
#endif
893

894
if (protocol  == "udp") {
895 896
    if (options.type != ZMQ_RADIO) {
        errno = ENOCOMPATPROTO;
897
        LIBZMQ_DELETE(paddr);
898 899 900
        return -1;
    }

901 902
    paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
    alloc_assert (paddr->resolved.udp_addr);
903
    rc = paddr->resolved.udp_addr->resolve (address.c_str(), false);
904 905 906 907 908 909
    if (rc != 0) {
        LIBZMQ_DELETE(paddr);
        return -1;
    }
}

bebopagogo's avatar
bebopagogo committed
910
// TBD - Should we check address for ZMQ_HAVE_NORM???
911

912 913 914 915 916 917 918
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
919 920 921
        if (rc != 0 || port_number == 0) {
          return -1;
        }
922
    }
923
#endif
924
#if defined ZMQ_HAVE_TIPC
925 926 927 928 929 930
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
931
            LIBZMQ_DELETE(paddr);
932 933 934 935
            return -1;
        }
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
936 937 938 939 940 941 942 943 944 945 946 947
#if defined ZMQ_HAVE_VMCI
    else
    if (protocol == "vmci") {
        paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
            return -1;
        }
    }
#endif
948

949
    //  Create session.
950
    session_base_t *session = session_base_t::create (io_thread, true, this,
951
        options, paddr);
952
    errno_assert (session);
953

954
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
955
    //  sent to this pipe. (same for NORM, currently?)
956
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
957
    pipe_t *newpipe = NULL;
958

959
    if (options.immediate != 1 || subscribe_to_all) {
960 961
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
962
        pipe_t *new_pipes [2] = {NULL, NULL};
963 964 965 966 967 968 969 970 971 972 973

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
974
        rc = pipepair (parents, new_pipes, hwms, conflates);
975
        errno_assert (rc == 0);
976

977
        //  Attach local end of the pipe to the socket object.
978
        attach_pipe (new_pipes [0], subscribe_to_all);
979
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
980

981
        //  Attach remote end of the pipe to the session object later on.
982
        session->attach_pipe (new_pipes [1]);
983 984 985
    }

    //  Save last endpoint URI
986
    paddr->to_string (last_endpoint);
987

988
    add_endpoint (addr_, (own_t *) session, newpipe);
989 990 991
    return 0;
}

992
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
993
{
994
    //  Activate the session. Make it a child of this socket.
995
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
996
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
997 998 999 1000
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
1001
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1002

1003 1004 1005 1006 1007
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
malosek's avatar
malosek committed
1008

1009
    //  Check whether endpoint address passed to the function is valid.
1010 1011 1012 1013 1014
    if (unlikely (!addr_)) {
        errno = EINVAL;
        return -1;
    }

1015 1016 1017
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
1018
    if (unlikely(rc != 0)) {
1019
        return -1;
somdoron's avatar
somdoron committed
1020
    }
1021

1022 1023 1024
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
1025
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
1026
        return -1;
somdoron's avatar
somdoron committed
1027
    }
1028 1029 1030

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
1031
        if (unregister_endpoint (std::string(addr_), this) == 0) {
Martin Hurton's avatar
Martin Hurton committed
1032
            return 0;
somdoron's avatar
somdoron committed
1033
        }
1034 1035 1036 1037 1038
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
1039

1040
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
1041
            it->second->terminate (true);
1042 1043 1044 1045
        inprocs.erase (range.first, range.second);
        return 0;
    }

1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
    std::string resolved_addr = std::string (addr_);
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range;

    // The resolved last_endpoint is used as a key in the endpoints map.
    // The address passed by the user might not match in the TCP case due to
    // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
    // resolve before giving up. Given at this stage we don't know whether a
    // socket is connected or bound, try with both.
    if (protocol == "tcp") {
        range = endpoints.equal_range (resolved_addr);
        if (range.first == range.second) {
            tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
            alloc_assert (tcp_addr);
            rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);

            if (rc == 0) {
                tcp_addr->to_string (resolved_addr);
                range = endpoints.equal_range (resolved_addr);

                if (range.first == range.second) {
                    rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
                    if (rc == 0) {
                        tcp_addr->to_string (resolved_addr);
                    }
                }
            }
            LIBZMQ_DELETE(tcp_addr);
        }
    }

1076
    //  Find the endpoints range (if any) corresponding to the addr_ string.
1077
    range = endpoints.equal_range (resolved_addr);
1078 1079
    if (range.first == range.second) {
        errno = ENOENT;
1080
        return -1;
1081
    }
1082

1083 1084 1085
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1086
            it->second.second->terminate (false);
1087
        term_child (it->second.first);
1088
    }
1089
    endpoints.erase (range.first, range.second);
1090
    return 0;
1091 1092
}

1093
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1094
{
1095
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1096

1097
    //  Check whether the library haven't been shut down yet.
1098
    if (unlikely (ctx_terminated)) {
1099 1100 1101 1102
        errno = ETERM;
        return -1;
    }

1103
    //  Check whether message passed to the function is valid.
1104
    if (unlikely (!msg_ || !msg_->check ())) {
1105 1106 1107 1108
        errno = EFAULT;
        return -1;
    }

1109
    //  Process pending commands, if any.
1110
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1111
    if (unlikely (rc != 0)) {
1112
        return -1;
somdoron's avatar
somdoron committed
1113
    }
1114

1115 1116 1117
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1118
    //  At this point we impose the flags on the message.
1119
    if (flags_ & ZMQ_SNDMORE)
1120
        msg_->set_flags (msg_t::more);
1121

1122 1123
    msg_->reset_metadata ();

1124
    //  Try to send the message using method in each socket class
1125
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1126
    if (rc == 0) {
1127
        return 0;
somdoron's avatar
somdoron committed
1128
    }
1129
    if (unlikely (errno != EAGAIN)) {
1130
        return -1;
somdoron's avatar
somdoron committed
1131
    }
Martin Sustrik's avatar
Martin Sustrik committed
1132

1133
    //  In case of non-blocking send we'll simply propagate
1134
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
1135
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
Martin Sustrik's avatar
Martin Sustrik committed
1136
        return -1;
somdoron's avatar
somdoron committed
1137
    }
Martin Sustrik's avatar
Martin Sustrik committed
1138

1139
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1140
    //  If the timeout is infinite, don't care.
1141 1142 1143
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1144 1145
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1146 1147
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1148
        if (unlikely (process_commands (timeout, false) != 0)) {
1149
            return -1;
1150
        }
1151
        rc = xsend (msg_);
1152 1153
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1154
        if (unlikely (errno != EAGAIN)) {
1155
            return -1;
somdoron's avatar
somdoron committed
1156
        }
1157
        if (timeout > 0) {
1158
            timeout = (int) (end - clock.now_ms ());
1159 1160 1161 1162 1163
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1164
    }
somdoron's avatar
somdoron committed
1165

Martin Sustrik's avatar
Martin Sustrik committed
1166
    return 0;
1167 1168
}

1169
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1170
{
1171
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1172

1173
    //  Check whether the library haven't been shut down yet.
1174
    if (unlikely (ctx_terminated)) {
1175 1176 1177 1178
        errno = ETERM;
        return -1;
    }

1179
    //  Check whether message passed to the function is valid.
1180
    if (unlikely (!msg_ || !msg_->check ())) {
1181 1182 1183 1184
        errno = EFAULT;
        return -1;
    }

1185 1186 1187 1188 1189 1190 1191
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1192
    //  ticks is more efficient than doing RDTSC all the time.
1193
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1194
        if (unlikely (process_commands (0, false) != 0)) {
1195
            return -1;
somdoron's avatar
somdoron committed
1196
        }
1197 1198 1199
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1200
    //  Get the message.
1201
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1202
    if (unlikely (rc != 0 && errno != EAGAIN)) {
Martin Hurton's avatar
Martin Hurton committed
1203
        return -1;
somdoron's avatar
somdoron committed
1204
    }
Martin Hurton's avatar
Martin Hurton committed
1205

1206
    //  If we have the message, return immediately.
1207
    if (rc == 0) {
1208
        extract_flags (msg_);
1209
        return 0;
1210
    }
1211

Martin Sustrik's avatar
Martin Sustrik committed
1212
    //  If the message cannot be fetched immediately, there are two scenarios.
1213
    //  For non-blocking recv, commands are processed in case there's an
Patrik Wenger's avatar
Patrik Wenger committed
1214
    //  activate_reader command already waiting in a command pipe.
1215
    //  If it's not, return EAGAIN.
1216
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1217
        if (unlikely (process_commands (0, false) != 0)) {
1218
            return -1;
somdoron's avatar
somdoron committed
1219
        }
1220
        ticks = 0;
1221

1222
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1223
        if (rc < 0) {
1224
            return rc;
somdoron's avatar
somdoron committed
1225
        }
1226
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1227

1228
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1229 1230
    }

1231
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1232
    //  If the timeout is infinite, don't care.
1233 1234 1235
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1236 1237
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1238
    bool block = (ticks != 0);
1239
    while (true) {
somdoron's avatar
somdoron committed
1240
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1241
            return -1;
somdoron's avatar
somdoron committed
1242
        }
1243
        rc = xrecv (msg_);
1244 1245 1246 1247
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1248
        if (unlikely (errno != EAGAIN)) {
1249
            return -1;
somdoron's avatar
somdoron committed
1250
        }
1251
        block = true;
1252
        if (timeout > 0) {
1253
            timeout = (int) (end - clock.now_ms ());
1254 1255 1256 1257 1258
            if (timeout <= 0) {
                errno = EAGAIN;
                return -1;
            }
        }
1259
    }
1260

1261
    extract_flags (msg_);
1262
    return 0;
1263 1264 1265 1266
}

int zmq::socket_base_t::close ()
{
1267
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
1268

1269 1270 1271 1272
    //  Remove all existing signalers for thread safe sockets
    if (thread_safe)
        ((mailbox_safe_t*)mailbox)->clear_signalers();

1273 1274
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1275

1276

1277 1278 1279 1280
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1281

1282 1283 1284
    return 0;
}

1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1295
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1296
{
1297
    //  Plug the socket to the reaper thread.
1298
    poller = poller_;
somdoron's avatar
somdoron committed
1299 1300 1301 1302 1303

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
1304
    else {
1305
        scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
somdoron's avatar
somdoron committed
1306

1307
        reaper_signaler = new (std::nothrow) signaler_t();
1308
        zmq_assert (reaper_signaler);
1309

somdoron's avatar
somdoron committed
1310
        //  Add signaler to the safe mailbox
1311
        fd = reaper_signaler->get_fd();
1312
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1313 1314

        //  Send a signal to make sure reaper handle existing commands
1315
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1316 1317 1318 1319

    }

    handle = poller->add_fd (fd, this);
1320
    poller->set_pollin (handle);
1321 1322 1323 1324 1325

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1326 1327
}

1328
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1329
{
1330
    int rc;
1331
    command_t cmd;
1332 1333 1334
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1335
        rc = mailbox->recv (&cmd, timeout_);
1336 1337
    }
    else {
1338

1339 1340 1341
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1342
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1343
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1344

1345 1346 1347 1348 1349 1350
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1351 1352
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1353 1354 1355
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1356
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1357
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1358
            last_tsc = tsc;
1359 1360 1361
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1362
        rc = mailbox->recv (&cmd, 0);
1363
    }
Martin Sustrik's avatar
Martin Sustrik committed
1364

1365 1366
    //  Process all available commands.
    while (rc == 0) {
1367
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1368
        rc = mailbox->recv (&cmd, 0);
1369 1370 1371 1372 1373 1374
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1375

1376
    if (ctx_terminated) {
1377 1378
        errno = ETERM;
        return -1;
1379
    }
1380 1381

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1382 1383
}

1384
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1385
{
1386
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1387
    //  We'll remember the fact so that any blocking call is interrupted and any
1388 1389
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1390 1391 1392
    scoped_lock_t lock(monitor_sync);
    stop_monitor ();
    
1393
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1394 1395
}

1396
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1397
{
1398
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1399 1400
}

1401
void zmq::socket_base_t::process_term (int linger_)
1402
{
1403 1404 1405 1406 1407
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1408 1409
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1410
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1411
    register_term_acks ((int) pipes.size ());
1412

1413
    //  Continue the termination process immediately.
1414
    own_t::process_term (linger_);
1415 1416
}

1417 1418
void zmq::socket_base_t::update_pipe_options(int option_)
{
1419 1420 1421 1422 1423
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    {
        for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
        {
            pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
1424
            pipes[i]->send_hwms_to_peer(options.sndhwm, options.rcvhwm);
1425 1426
        }
    }
1427 1428 1429

}

1430 1431 1432 1433 1434
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1435
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1436 1437 1438 1439
{
    errno = EINVAL;
    return -1;
}
1440 1441 1442 1443 1444 1445

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1446
int zmq::socket_base_t::xsend (msg_t *)
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

somdoron's avatar
somdoron committed
1457 1458
int zmq::socket_base_t::xjoin (const char *group_)
{
1459
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1460 1461 1462 1463 1464 1465
    errno = ENOTSUP;
    return -1;
}

int zmq::socket_base_t::xleave (const char *group_)
{
1466
    LIBZMQ_UNUSED (group_);
somdoron's avatar
somdoron committed
1467 1468 1469 1470
    errno = ENOTSUP;
    return -1;
}

1471
int zmq::socket_base_t::xrecv (msg_t *)
1472 1473 1474 1475 1476
{
    errno = ENOTSUP;
    return -1;
}

1477 1478 1479 1480 1481
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1482
void zmq::socket_base_t::xread_activated (pipe_t *)
1483 1484 1485
{
    zmq_assert (false);
}
1486
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1487 1488 1489 1490
{
    zmq_assert (false);
}

1491
void zmq::socket_base_t::xhiccuped (pipe_t *)
1492
{
1493
    zmq_assert (false);
1494 1495
}

1496 1497
void zmq::socket_base_t::in_event ()
{
1498 1499 1500 1501
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1502 1503
  {
    scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
1504

1505 1506 1507
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
1508

1509
    process_commands (0, false);
1510
  }
1511
    check_destroy();
1512 1513 1514 1515 1516 1517 1518
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1519
void zmq::socket_base_t::timer_event (int)
1520 1521 1522
{
    zmq_assert (false);
}
1523

1524 1525
void zmq::socket_base_t::check_destroy ()
{
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1553 1554
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1555
    if (options.immediate == 1)
1556 1557 1558 1559
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1560 1561
}

1562
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1563 1564
{
    //  Notify the specific socket type about the pipe termination.
1565
    xpipe_terminated (pipe_);
1566

1567
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1568
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1569
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1570
            inprocs.erase (it);
1571
            break;
1572 1573
        }

1574 1575 1576 1577 1578 1579 1580
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1581 1582
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1583
    //  Test whether IDENTITY flag is valid for this socket type.
1584
    if (unlikely (msg_->flags () & msg_t::identity))
1585
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1586

1587
    //  Remove MORE flag.
1588 1589
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1590

1591
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1592
{
1593 1594
    scoped_lock_t lock(monitor_sync);
    
1595 1596 1597 1598
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1599

1600
    //  Support deregistering monitoring endpoints as well
1601 1602 1603 1604 1605 1606 1607
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1608
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1609 1610
        return -1;

1611
    //  Event notification only supported over inproc://
1612 1613 1614 1615
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1616 1617 1618 1619
    // already monitoring. Stop previous monitor before starting new one.
    if (monitor_socket != NULL) {
        stop_monitor (true);
    }
1620
    //  Register events to monitor
1621
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1622
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1623 1624 1625
    if (monitor_socket == NULL)
        return -1;

1626
    //  Never block context termination on pending event messages
1627
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1628
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1629
    if (rc == -1)
1630
        stop_monitor (false);
1631

1632
    //  Spawn the monitor socket endpoint
1633 1634
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
1635
         stop_monitor (false);
1636 1637 1638
    return rc;
}

1639
void zmq::socket_base_t::event_connected (const std::string &addr_, zmq::fd_t fd_)
1640
{
1641
    event(addr_, fd_, ZMQ_EVENT_CONNECTED);
1642
}
1643

Martin Hurton's avatar
Martin Hurton committed
1644
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1645
{
1646
    event(addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
1647
}
1648

Martin Hurton's avatar
Martin Hurton committed
1649
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1650
{
1651
    event(addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
1652 1653
}

1654
void zmq::socket_base_t::event_listening (const std::string &addr_, zmq::fd_t fd_)
1655
{
1656
    event(addr_, fd_, ZMQ_EVENT_LISTENING);
1657 1658
}

Martin Hurton's avatar
Martin Hurton committed
1659
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1660
{
1661
    event(addr_, err_, ZMQ_EVENT_BIND_FAILED);
1662 1663
}

1664
void zmq::socket_base_t::event_accepted (const std::string &addr_, zmq::fd_t fd_)
1665
{
1666
    event(addr_, fd_, ZMQ_EVENT_ACCEPTED);
1667 1668
}

Martin Hurton's avatar
Martin Hurton committed
1669
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1670
{
1671
    event(addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
1672 1673
}

1674
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_)
1675
{
1676
    event(addr_, fd_, ZMQ_EVENT_CLOSED);
1677
}
Martin Hurton's avatar
Martin Hurton committed
1678 1679

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1680
{
1681
    event(addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
1682 1683
}

1684
void zmq::socket_base_t::event_disconnected (const std::string &addr_, zmq::fd_t fd_)
1685
{
1686 1687 1688
    event(addr_, fd_, ZMQ_EVENT_DISCONNECTED);
}

1689 1690
void zmq::socket_base_t::event_handshake_failed_no_detail (
  const std::string &addr_, int err_)
Vincent Tellier's avatar
Vincent Tellier committed
1691
{
1692
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
Vincent Tellier's avatar
Vincent Tellier committed
1693 1694
}

1695 1696
void zmq::socket_base_t::event_handshake_failed_zmtp (const std::string &addr_,
                                                      int err_)
Vincent Tellier's avatar
Vincent Tellier committed
1697
{
1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_ZMTP);
}

void zmq::socket_base_t::event_handshake_failed_zap (const std::string &addr_,
                                                     int err_)
{
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_ZAP);
}

void zmq::socket_base_t::event_handshake_failed_encryption (
  const std::string &addr_, int err_)
{
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_ENCRYPTION);
}

void zmq::socket_base_t::event_handshake_succeeded (const std::string &addr_,
                                                    int err_)
{
    event (addr_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
Vincent Tellier's avatar
Vincent Tellier committed
1717 1718
}

1719
void zmq::socket_base_t::event(const std::string &addr_, intptr_t value_, int type_)
1720 1721 1722 1723
{
    scoped_lock_t lock(monitor_sync);
    if (monitor_events & type_)
    {
1724
        monitor_event (type_, value_, addr_);
1725
    }
1726 1727
}

1728
//  Send a monitor event
1729
void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std::string &addr_)
1730
{
1731 1732 1733
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1734
    if (monitor_socket) {
1735
        //  Send event in first frame
1736
        zmq_msg_t msg;
1737 1738
        zmq_msg_init_size (&msg, 6);
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1739 1740 1741 1742 1743
        //  Avoid dereferencing uint32_t on unaligned address
        uint16_t event = (uint16_t) event_;
        uint32_t value = (uint32_t) value_;
        memcpy (data + 0, &event, sizeof(event));
        memcpy (data + 2, &value, sizeof(value));
1744
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1745 1746

        //  Send address in second frame
1747
        zmq_msg_init_size (&msg, addr_.size());
1748
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1749 1750
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1751 1752
}

1753
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1754
{
1755 1756 1757
    // this is a private method which is only called from
    // contexts where the mutex has been locked before

1758
    if (monitor_socket) {
1759
        if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1760
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1761 1762 1763 1764
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1765
}