socket_base.cpp 46.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <string>
32 33
#include <algorithm>

34
#include "macros.hpp"
35 36 37 38 39
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
40
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
41 42
#include <cmnintrin.h>
#else
43 44
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
45
#endif
46 47 48
#else
#include <unistd.h>
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
malosek's avatar
malosek committed
61
#include "platform.hpp"
62
#include "likely.hpp"
63
#include "msg.hpp"
64 65 66
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
70 71 72 73 74 75

#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif

76 77 78
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
79

80 81 82 83 84 85 86
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
87 88
#include "dealer.hpp"
#include "router.hpp"
89 90
#include "xpub.hpp"
#include "xsub.hpp"
91
#include "stream.hpp"
92
#include "server.hpp"
93
#include "client.hpp"
94

somdoron's avatar
somdoron committed
95 96 97 98
#define ENTER_MUTEX() \
    if (thread_safe) \
        sync.lock();

99
#define EXIT_MUTEX(); \
somdoron's avatar
somdoron committed
100 101 102
    if (thread_safe) \
        sync.unlock();

103 104 105 106 107
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

108
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
109
    uint32_t tid_, int sid_)
110 111 112
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
149 150 151
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
152 153 154
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
155 156 157
        default:
            errno = EINVAL;
            return NULL;
158
    }
159 160

    alloc_assert (s);
somdoron's avatar
somdoron committed
161 162 163

    mailbox_t *mailbox = dynamic_cast<mailbox_t*> (s->mailbox);

164 165
    if (mailbox != NULL && mailbox->get_fd () == retired_fd) {
        s->destroyed = true;
166
        LIBZMQ_DELETE(s);
Pieter Hintjens's avatar
Pieter Hintjens committed
167
        return NULL;
168
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
169

170 171 172
    return s;
}

somdoron's avatar
somdoron committed
173
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
174
    own_t (parent_, tid_),
175
    tag (0xbaddecaf),
176
    ctx_terminated (false),
177
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
178
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
179
    ticks (0),
180
    rcvmore (false),
181
    file_desc(-1),
182
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
183
    monitor_events (0),
184 185
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
186
{
187
    options.socket_id = sid_;
188
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
189
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
190 191 192 193 194

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else
        mailbox = new mailbox_t();
195 196 197 198
}

zmq::socket_base_t::~socket_base_t ()
{
199
    LIBZMQ_DELETE(mailbox);
200
    
201 202 203
    if (reaper_signaler) {
        LIBZMQ_DELETE(reaper_signaler);
    }
204
    
205
    stop_monitor ();
206
    zmq_assert (destroyed);
207 208
}

somdoron's avatar
somdoron committed
209
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
210
{
somdoron's avatar
somdoron committed
211
    return mailbox;
212 213 214 215 216 217 218 219 220 221 222
}

void zmq::socket_base_t::stop ()
{
    //  Called by ctx when it is terminated (zmq_term).
    //  'stop' command is sent from the threads that called zmq_term to
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

223 224 225 226 227 228 229 230 231 232 233 234 235
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
236

237 238 239 240 241 242 243
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

244 245
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
246
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
247 248 249 250 251 252
    if (protocol_ != "inproc"
    &&  protocol_ != "ipc"
    &&  protocol_ != "tcp"
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
    &&  protocol_ != "tipc"
Ilya Kulakov's avatar
Ilya Kulakov committed
253 254
    &&  protocol_ != "norm"
    &&  protocol_ != "vmci") {
255 256 257 258
        errno = EPROTONOSUPPORT;
        return -1;
    }
    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
259
    //  are not available.
260 261 262 263 264 265
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
266

bebopagogo's avatar
bebopagogo committed
267 268 269 270 271 272
#if !defined ZMQ_HAVE_NORM
    if (protocol_ == "norm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif // !ZMQ_HAVE_NORM
273 274 275

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
276
    if (protocol_ == "ipc") {
277 278 279 280 281 282
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

283
    // TIPC transport is only available on Linux.
284
#if !defined ZMQ_HAVE_TIPC
Erik Hugne's avatar
Erik Hugne committed
285
    if (protocol_ == "tipc") {
286 287 288 289 290
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

Ilya Kulakov's avatar
Ilya Kulakov committed
291 292 293 294 295 296 297
#if !defined ZMQ_HAVE_VMCI
    if (protocol_ == "vmci") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

298 299 300
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
bebopagogo's avatar
bebopagogo committed
301
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
302 303
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
304 305 306 307 308 309 310 311
        errno = ENOCOMPATPROTO;
        return -1;
    }

    //  Protocol is available.
    return 0;
}

312
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
313
{
314 315 316
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
317

318
    //  Let the derived socket type know about new pipe.
319
    xattach_pipe (pipe_, subscribe_to_all_);
320 321 322 323 324

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
325
        pipe_->terminate (false);
326
    }
327 328
}

329
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
330
    size_t optvallen_)
331
{
332
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
333

334 335
    if (!options.is_valid(option_)) {
        errno = EINVAL;
336
        EXIT_MUTEX ();
337 338
        return -1;
    }
339

340
    if (unlikely (ctx_terminated)) {
341
        errno = ETERM;
342
        EXIT_MUTEX ();
343 344 345
        return -1;
    }

346 347
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
348
    if (rc == 0 || errno != EINVAL) {
349
        EXIT_MUTEX ();
350
        return rc;
somdoron's avatar
somdoron committed
351
    }
352 353 354

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
355
    rc = options.setsockopt (option_, optval_, optvallen_);
356
    update_pipe_options(option_);
somdoron's avatar
somdoron committed
357

358
    EXIT_MUTEX ();
somdoron's avatar
somdoron committed
359
    return rc;
360 361
}

362 363 364
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
365
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
366

367
    if (unlikely (ctx_terminated)) {
368
        errno = ETERM;
369
        EXIT_MUTEX ();
370 371 372
        return -1;
    }

373
    if (option_ == ZMQ_RCVMORE) {
374
        if (*optvallen_ < sizeof (int)) {
375
            errno = EINVAL;
376
            EXIT_MUTEX ();
377 378
            return -1;
        }
379
        memset(optval_, 0, *optvallen_);
380 381
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
382
        EXIT_MUTEX ();
383 384 385
        return 0;
    }

386 387 388
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
389
            EXIT_MUTEX ();
390 391
            return -1;
        }
somdoron's avatar
somdoron committed
392 393 394 395

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
396
            EXIT_MUTEX ();
somdoron's avatar
somdoron committed
397 398
            return -1;
        }
399

somdoron's avatar
somdoron committed
400
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
401 402
        *optvallen_ = sizeof(fd_t);

403
        EXIT_MUTEX ();
404 405 406 407
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
408
        if (*optvallen_ < sizeof (int)) {
409
            errno = EINVAL;
410
            EXIT_MUTEX ();
411 412
            return -1;
        }
413
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
414
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
415
            EXIT_MUTEX ();
416
            return -1;
somdoron's avatar
somdoron committed
417
        }
418
        errno_assert (rc == 0);
419
        *((int*) optval_) = 0;
420
        if (has_out ())
421
            *((int*) optval_) |= ZMQ_POLLOUT;
422
        if (has_in ())
423 424
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
425
        EXIT_MUTEX ();
426 427 428
        return 0;
    }

429 430 431
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
432
            EXIT_MUTEX ();
433 434 435 436
            return -1;
        }
        strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
        *optvallen_ = last_endpoint.size () + 1;
437
        EXIT_MUTEX ();
438 439 440
        return 0;
    }

441 442 443
    if (option_ == ZMQ_THREAD_SAFE) {
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
444
            EXIT_MUTEX ();
445 446 447 448 449
            return -1;
        }
        memset(optval_, 0, *optvallen_);
        *((int*) optval_) = thread_safe	? 1 : 0;
        *optvallen_ = sizeof (int);
450
        EXIT_MUTEX ();
451 452 453
        return 0;
    }  

somdoron's avatar
somdoron committed
454
    int rc = options.getsockopt (option_, optval_, optvallen_);
455
    EXIT_MUTEX ();
somdoron's avatar
somdoron committed
456
    return rc;
457 458
}

459 460
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
461
    ENTER_MUTEX ();
462 463 464

    if (!thread_safe) {
        errno = EINVAL;
465
        EXIT_MUTEX ();
466 467 468 469 470
        return -1;  
    }

    ((mailbox_safe_t*)mailbox)->add_signaler(s_);

471
    EXIT_MUTEX ();
472 473 474 475 476
    return 0;
}

int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
477
    ENTER_MUTEX ();
478 479 480

    if (!thread_safe) {
        errno = EINVAL;
481
        EXIT_MUTEX ();
482 483 484 485 486
        return -1;
    }

    ((mailbox_safe_t*)mailbox)->remove_signaler(s_);

487
    EXIT_MUTEX ();
488 489 490
    return 0;
}

491 492
int zmq::socket_base_t::bind (const char *addr_)
{
493
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
494

495
    if (unlikely (ctx_terminated)) {
496
        errno = ETERM;
497
        EXIT_MUTEX ();
498 499 500
        return -1;
    }

501 502
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
503
    if (unlikely (rc != 0)) {
504
        EXIT_MUTEX ();
505
        return -1;
somdoron's avatar
somdoron committed
506
    }
507

508
    //  Parse addr_ string.
509 510
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
511
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
512
        EXIT_MUTEX ();
513
        return -1;
somdoron's avatar
somdoron committed
514
    }
515

Pieter Hintjens's avatar
Pieter Hintjens committed
516
    if (protocol == "inproc") {
517
        const endpoint_t endpoint = { this, options };
Martin Hurton's avatar
Martin Hurton committed
518
        const int rc = register_endpoint (addr_, endpoint);
519
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
520
            connect_pending (addr_, this);
521
            last_endpoint.assign (addr_);
522
            options.connected = true;
523
        }
524
        EXIT_MUTEX ();
525
        return rc;
526
    }
527

bebopagogo's avatar
bebopagogo committed
528
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
529
        //  For convenience's sake, bind can be used interchangeable with
bebopagogo's avatar
bebopagogo committed
530
        //  connect for PGM, EPGM and NORM transports.
531
        EXIT_MUTEX ();
532 533 534 535
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
536 537
    }

538
    //  Remaining transports require to be run in an I/O thread, so at this
539 540 541 542
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
543
        EXIT_MUTEX ();
544 545
        return -1;
    }
546

547
    if (protocol == "tcp") {
548
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
549
            io_thread, this, options);
550
        alloc_assert (listener);
551
        int rc = listener->set_address (address.c_str ());
552
        if (rc != 0) {
553
            LIBZMQ_DELETE(listener);
554
            event_bind_failed (address, zmq_errno());
555
            EXIT_MUTEX ();
556
            return -1;
557
        }
558

559
        // Save last endpoint URI
560
        listener->get_address (last_endpoint);
561

562
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
563
        options.connected = true;
564
        EXIT_MUTEX ();
565 566 567
        return 0;
    }

568
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
569 570 571 572
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
573
        int rc = listener->set_address (address.c_str ());
574
        if (rc != 0) {
575
            LIBZMQ_DELETE(listener);
576
            event_bind_failed (address, zmq_errno());
577
            EXIT_MUTEX ();
578 579
            return -1;
        }
580

581
        // Save last endpoint URI
582
        listener->get_address (last_endpoint);
583

584
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
585
        options.connected = true;
586
        EXIT_MUTEX ();
587
        return 0;
588
    }
589
#endif
590
#if defined ZMQ_HAVE_TIPC
591 592 593 594 595 596
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
597
             LIBZMQ_DELETE(listener);
598
             event_bind_failed (address, zmq_errno());
599
             EXIT_MUTEX ();
600 601 602 603 604 605 606
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
607
        options.connected = true;
608
        EXIT_MUTEX ();
609 610 611
        return 0;
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
612 613 614 615 616 617 618 619 620
#if defined ZMQ_HAVE_VMCI
    if (protocol == "vmci") {
        vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
        int rc = listener->set_address (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(listener);
            event_bind_failed (address, zmq_errno ());
621
            EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
622 623 624 625 626 627 628
            return -1;
        }

        listener->get_address (last_endpoint);

        add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
        options.connected = true;
629
        EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
630 631 632
        return 0;
    }
#endif
633

634
    EXIT_MUTEX ();
635
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
636
    return -1;
637 638
}

639
int zmq::socket_base_t::connect (const char *addr_)
640
{
641
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
642

643
    if (unlikely (ctx_terminated)) {
644
        errno = ETERM;
645
        EXIT_MUTEX ();
646 647 648
        return -1;
    }

649 650
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
651
    if (unlikely (rc != 0)) {
652
        EXIT_MUTEX ();
653
        return -1;
somdoron's avatar
somdoron committed
654
    }
655

malosek's avatar
malosek committed
656
    //  Parse addr_ string.
657 658
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
659
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
660
        EXIT_MUTEX ();
661
        return -1;
somdoron's avatar
somdoron committed
662
    }
malosek's avatar
malosek committed
663

Pieter Hintjens's avatar
Pieter Hintjens committed
664
    if (protocol == "inproc") {
665

666 667 668 669
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

670 671
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
672

673
        // The total HWM for an inproc connection should be the sum of
674
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
675
        int sndhwm = 0;
676 677 678
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
679
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
680
        int rcvhwm = 0;
681 682
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
683 684
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
685
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
686

687
        //  Create a bi-directional pipe to connect the peers.
688
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
689
        pipe_t *new_pipes [2] = {NULL, NULL};
690 691 692 693 694 695 696 697 698 699

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
700
        int rc = pipepair (parents, new_pipes, hwms, conflates);
701 702 703 704 705
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

706
        errno_assert (rc == 0);
707

708 709 710 711 712 713 714 715 716 717 718 719 720 721
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
722 723
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
724
        }
Martin Hurton's avatar
Martin Hurton committed
725
        else {
726 727 728 729 730 731 732 733 734 735 736
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
737

738 739 740 741 742 743 744 745 746 747 748
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
749

750 751 752 753 754
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
755

Martin Hurton's avatar
Martin Hurton committed
756 757 758
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

759
        // Save last endpoint URI
760
        last_endpoint.assign (addr_);
761

762
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
763
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
764

765
        options.connected = true;
766
        EXIT_MUTEX ();
767 768
        return 0;
    }
769 770 771 772
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
773
        const endpoints_t::iterator it = endpoints.find (addr_);
774 775 776 777
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
778
            EXIT_MUTEX ();
779 780 781
            return 0;
        }
    }
782

783 784 785 786
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
787
        EXIT_MUTEX ();
788 789 790
        return -1;
    }

Ilya Kulakov's avatar
Ilya Kulakov committed
791
    address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
792
    alloc_assert (paddr);
793 794 795

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
796 797 798
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
799 800
        //  - IPv6 link local address may contain % followed by interface name / zone_id
        //    (Reference: https://tools.ietf.org/html/rfc4007)
801 802 803 804 805 806
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
807
        if (isalnum (*check) || isxdigit (*check) || *check == '[') {
808 809 810
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
811 812
                || *check == '.' || *check == '-' || *check == ':' || *check == '%'
                || *check == ';' || *check == ']')
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
                check++;
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
828
            errno = EINVAL;
829
            LIBZMQ_DELETE(paddr);
830
            EXIT_MUTEX ();
831 832 833
            return -1;
        }
        //  Defer resolution until a socket is opened
834
        paddr->resolved.tcp_addr = NULL;
835
    }
836
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
837 838
    else
    if (protocol == "ipc") {
839
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
840
        alloc_assert (paddr->resolved.ipc_addr);
841 842
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
843
            LIBZMQ_DELETE(paddr);
844
            EXIT_MUTEX ();
845 846 847
            return -1;
        }
    }
848
#endif
849

bebopagogo's avatar
bebopagogo committed
850
// TBD - Should we check address for ZMQ_HAVE_NORM???
851

852 853 854 855 856 857 858
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
859
        if (rc != 0 || port_number == 0) {
860
          EXIT_MUTEX ();
861 862
          return -1;
        }
863
    }
864
#endif
865
#if defined ZMQ_HAVE_TIPC
866 867 868 869 870 871
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
872
            LIBZMQ_DELETE(paddr);
873
            EXIT_MUTEX ();
874 875 876 877
            return -1;
        }
    }
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
878 879 880 881 882 883 884 885
#if defined ZMQ_HAVE_VMCI
    else
    if (protocol == "vmci") {
        paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
        alloc_assert (paddr->resolved.vmci_addr);
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
        if (rc != 0) {
            LIBZMQ_DELETE(paddr);
886
            EXIT_MUTEX ();
Ilya Kulakov's avatar
Ilya Kulakov committed
887 888 889 890
            return -1;
        }
    }
#endif
891

892
    //  Create session.
893
    session_base_t *session = session_base_t::create (io_thread, true, this,
894
        options, paddr);
895
    errno_assert (session);
896

897
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
898 899
    //  sent to this pipe. (same for NORM, currently?)
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm";
900
    pipe_t *newpipe = NULL;
901

902
    if (options.immediate != 1 || subscribe_to_all) {
903 904
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
905
        pipe_t *new_pipes [2] = {NULL, NULL};
906 907 908 909 910 911 912 913 914 915 916

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
917
        rc = pipepair (parents, new_pipes, hwms, conflates);
918
        errno_assert (rc == 0);
919

920
        //  Attach local end of the pipe to the socket object.
921
        attach_pipe (new_pipes [0], subscribe_to_all);
922
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
923

924
        //  Attach remote end of the pipe to the session object later on.
925
        session->attach_pipe (new_pipes [1]);
926 927 928
    }

    //  Save last endpoint URI
929
    paddr->to_string (last_endpoint);
930

931
    add_endpoint (addr_, (own_t *) session, newpipe);
932
    EXIT_MUTEX ();
933 934 935
    return 0;
}

936
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
937
{
938
    //  Activate the session. Make it a child of this socket.
939
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
940
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
941 942 943 944
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
945
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
946

947 948 949
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
950
        EXIT_MUTEX ();
951 952
        return -1;
    }
malosek's avatar
malosek committed
953

954
    //  Check whether endpoint address passed to the function is valid.
955 956
    if (unlikely (!addr_)) {
        errno = EINVAL;
957
        EXIT_MUTEX ();
958 959 960
        return -1;
    }

961 962 963
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
964
    if (unlikely(rc != 0)) {
965
        EXIT_MUTEX ();
966
        return -1;
somdoron's avatar
somdoron committed
967
    }
968

969 970 971
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
972
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
973
        EXIT_MUTEX ();
974
        return -1;
somdoron's avatar
somdoron committed
975
    }
976 977 978

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
979
        if (unregister_endpoint (std::string(addr_), this) == 0) {
980
            EXIT_MUTEX ();
Martin Hurton's avatar
Martin Hurton committed
981
            return 0;
somdoron's avatar
somdoron committed
982
        }
983 984 985
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
986
            EXIT_MUTEX ();
987 988
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
989

990
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
991
            it->second->terminate (true);
992
        inprocs.erase (range.first, range.second);
993
        EXIT_MUTEX ();
994 995 996
        return 0;
    }

997 998
    //  Find the endpoints range (if any) corresponding to the addr_ string.
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
999 1000
    if (range.first == range.second) {
        errno = ENOENT;
1001
        EXIT_MUTEX ();
1002
        return -1;
1003
    }
1004

1005 1006 1007
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
1008
            it->second.second->terminate (false);
1009
        term_child (it->second.first);
1010
    }
1011
    endpoints.erase (range.first, range.second);
1012
    EXIT_MUTEX ();
1013
    return 0;
1014 1015
}

1016
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1017
{
1018
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1019

1020
    //  Check whether the library haven't been shut down yet.
1021
    if (unlikely (ctx_terminated)) {
1022
        errno = ETERM;
1023
        EXIT_MUTEX ();
1024 1025 1026
        return -1;
    }

1027
    //  Check whether message passed to the function is valid.
1028
    if (unlikely (!msg_ || !msg_->check ())) {
1029
        errno = EFAULT;
1030
        EXIT_MUTEX ();
1031 1032 1033
        return -1;
    }

1034
    //  Process pending commands, if any.
1035
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
1036
    if (unlikely (rc != 0)) {
1037
        EXIT_MUTEX ();
1038
        return -1;
somdoron's avatar
somdoron committed
1039
    }
1040

1041 1042 1043
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

1044
    //  At this point we impose the flags on the message.
1045
    if (flags_ & ZMQ_SNDMORE)
1046
        msg_->set_flags (msg_t::more);
1047

1048 1049
    msg_->reset_metadata ();

1050
    //  Try to send the message using method in each socket class
1051
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1052
    if (rc == 0) {
1053
        EXIT_MUTEX ();
1054
        return 0;
somdoron's avatar
somdoron committed
1055 1056
    }
    if (unlikely (errno != EAGAIN)) {
1057
        EXIT_MUTEX ();
1058
        return -1;
somdoron's avatar
somdoron committed
1059
    }
Martin Sustrik's avatar
Martin Sustrik committed
1060

1061
    //  In case of non-blocking send we'll simply propagate
1062
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
1063
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
1064
        EXIT_MUTEX ();
Martin Sustrik's avatar
Martin Sustrik committed
1065
        return -1;
somdoron's avatar
somdoron committed
1066
    }
Martin Sustrik's avatar
Martin Sustrik committed
1067

1068
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1069
    //  If the timeout is infinite, don't care.
1070 1071 1072
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1073 1074
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1075 1076
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1077
        if (unlikely (process_commands (timeout, false) != 0)) {
1078
            EXIT_MUTEX ();
1079
            return -1;
1080
        }
1081
        rc = xsend (msg_);
1082 1083
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1084
        if (unlikely (errno != EAGAIN)) {
1085
            EXIT_MUTEX ();
1086
            return -1;
somdoron's avatar
somdoron committed
1087
        }
1088
        if (timeout > 0) {
1089
            timeout = (int) (end - clock.now_ms ());
1090 1091
            if (timeout <= 0) {
                errno = EAGAIN;
1092
                EXIT_MUTEX ();
1093 1094 1095
                return -1;
            }
        }
1096
    }
somdoron's avatar
somdoron committed
1097

1098
    EXIT_MUTEX ();
Martin Sustrik's avatar
Martin Sustrik committed
1099
    return 0;
1100 1101
}

1102
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1103
{
1104
    ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1105

1106
    //  Check whether the library haven't been shut down yet.
1107
    if (unlikely (ctx_terminated)) {
1108
        errno = ETERM;
1109
        EXIT_MUTEX ();
1110 1111 1112
        return -1;
    }

1113
    //  Check whether message passed to the function is valid.
1114
    if (unlikely (!msg_ || !msg_->check ())) {
1115
        errno = EFAULT;
1116
        EXIT_MUTEX ();
1117 1118 1119
        return -1;
    }

1120 1121 1122 1123 1124 1125 1126
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1127
    //  ticks is more efficient than doing RDTSC all the time.
1128
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1129
        if (unlikely (process_commands (0, false) != 0)) {
1130
            EXIT_MUTEX ();
1131
            return -1;
somdoron's avatar
somdoron committed
1132
        }
1133 1134 1135
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1136
    //  Get the message.
1137
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1138
    if (unlikely (rc != 0 && errno != EAGAIN)) {
1139
        EXIT_MUTEX ();
Martin Hurton's avatar
Martin Hurton committed
1140
        return -1;
somdoron's avatar
somdoron committed
1141
    }
Martin Hurton's avatar
Martin Hurton committed
1142

1143
    //  If we have the message, return immediately.
1144
    if (rc == 0) {
1145
        if (file_desc != retired_fd)
1146
            msg_->set_fd(file_desc);
1147
        extract_flags (msg_);
1148
        EXIT_MUTEX ();
1149
        return 0;
1150
    }
1151

Martin Sustrik's avatar
Martin Sustrik committed
1152
    //  If the message cannot be fetched immediately, there are two scenarios.
1153 1154 1155
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
1156
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1157
        if (unlikely (process_commands (0, false) != 0)) {
1158
            EXIT_MUTEX ();
1159
            return -1;
somdoron's avatar
somdoron committed
1160
        }
1161
        ticks = 0;
1162

1163
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1164
        if (rc < 0) {
1165
            EXIT_MUTEX ();
1166
            return rc;
somdoron's avatar
somdoron committed
1167
        }
1168
        if (file_desc != retired_fd)
1169
            msg_->set_fd(file_desc);
1170
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1171

1172
        EXIT_MUTEX ();
1173
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1174 1175
    }

1176
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1177
    //  If the timeout is infinite, don't care.
1178 1179 1180
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1181 1182
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1183
    bool block = (ticks != 0);
1184
    while (true) {
somdoron's avatar
somdoron committed
1185
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1186
            EXIT_MUTEX ();
1187
            return -1;
somdoron's avatar
somdoron committed
1188
        }
1189
        rc = xrecv (msg_);
1190 1191 1192 1193
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1194
        if (unlikely (errno != EAGAIN)) {
1195
            EXIT_MUTEX ();
1196
            return -1;
somdoron's avatar
somdoron committed
1197
        }
1198
        block = true;
1199
        if (timeout > 0) {
1200
            timeout = (int) (end - clock.now_ms ());
1201 1202
            if (timeout <= 0) {
                errno = EAGAIN;
1203
                EXIT_MUTEX ();
1204 1205 1206
                return -1;
            }
        }
1207
    }
1208

1209
    if (file_desc != retired_fd)
1210
        msg_->set_fd(file_desc);
1211
    extract_flags (msg_);
1212
    EXIT_MUTEX ();
1213
    return 0;
1214 1215 1216 1217
}

int zmq::socket_base_t::close ()
{
1218 1219
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1220

1221 1222 1223 1224
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1225

1226 1227 1228
    return 0;
}

1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1239
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1240
{
1241
    //  Plug the socket to the reaper thread.
1242
    poller = poller_;
somdoron's avatar
somdoron committed
1243 1244 1245 1246 1247

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
1248
    else {
1249
        ENTER_MUTEX ();
somdoron's avatar
somdoron committed
1250

1251 1252
        reaper_signaler =  new signaler_t();

somdoron's avatar
somdoron committed
1253
        //  Add signaler to the safe mailbox
1254
        fd = reaper_signaler->get_fd();
1255
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1256 1257

        //  Send a signal to make sure reaper handle existing commands
1258
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1259

1260
        EXIT_MUTEX ();
somdoron's avatar
somdoron committed
1261 1262 1263
    }

    handle = poller->add_fd (fd, this);
1264
    poller->set_pollin (handle);
1265 1266 1267 1268 1269

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1270 1271
}

1272
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1273
{
1274
    int rc;
1275
    command_t cmd;
1276 1277 1278
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1279
        rc = mailbox->recv (&cmd, timeout_);
1280 1281
    }
    else {
1282

1283 1284 1285
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1286
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1287
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1288

1289 1290 1291 1292 1293 1294
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1295 1296
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1297 1298 1299
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1300
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1301
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1302
            last_tsc = tsc;
1303 1304 1305
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1306
        rc = mailbox->recv (&cmd, 0);
1307
    }
Martin Sustrik's avatar
Martin Sustrik committed
1308

1309 1310
    //  Process all available commands.
    while (rc == 0) {
1311
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1312
        rc = mailbox->recv (&cmd, 0);
1313 1314 1315 1316 1317 1318
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1319

1320
    if (ctx_terminated) {
1321 1322
        errno = ETERM;
        return -1;
1323
    }
1324 1325

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1326 1327
}

1328
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1329
{
1330
    //  Here, someone have called zmq_term while the socket was still alive.
1331
    //  We'll remember the fact so that any blocking call is interrupted and any
1332 1333
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1334
    stop_monitor ();
1335
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1336 1337
}

1338
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1339
{
1340
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1341 1342
}

1343
void zmq::socket_base_t::process_term (int linger_)
1344
{
1345 1346 1347 1348 1349
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1350 1351
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1352
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1353
    register_term_acks ((int) pipes.size ());
1354

1355
    //  Continue the termination process immediately.
1356
    own_t::process_term (linger_);
1357 1358
}

1359 1360
void zmq::socket_base_t::update_pipe_options(int option_)
{
1361 1362 1363 1364 1365 1366 1367
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    {
        for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
        {
            pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
        }
    }
1368 1369 1370

}

1371 1372 1373 1374 1375
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1376
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1377 1378 1379 1380
{
    errno = EINVAL;
    return -1;
}
1381 1382 1383 1384 1385 1386

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1387
int zmq::socket_base_t::xsend (msg_t *)
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

1398
int zmq::socket_base_t::xrecv (msg_t *)
1399 1400 1401 1402 1403
{
    errno = ENOTSUP;
    return -1;
}

1404 1405 1406 1407 1408
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1409
void zmq::socket_base_t::xread_activated (pipe_t *)
1410 1411 1412
{
    zmq_assert (false);
}
1413
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1414 1415 1416 1417
{
    zmq_assert (false);
}

1418
void zmq::socket_base_t::xhiccuped (pipe_t *)
1419
{
1420
    zmq_assert (false);
1421 1422
}

1423 1424
void zmq::socket_base_t::in_event ()
{
1425 1426 1427 1428
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
1429
    ENTER_MUTEX ();
1430

1431 1432 1433
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
1434

1435
    process_commands (0, false);
1436
    EXIT_MUTEX ();
1437 1438 1439 1440 1441 1442 1443 1444
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1445
void zmq::socket_base_t::timer_event (int)
1446 1447 1448
{
    zmq_assert (false);
}
1449

1450 1451
void zmq::socket_base_t::check_destroy ()
{
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1479 1480
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1481
    if (options.immediate == 1)
1482 1483 1484 1485
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1486 1487
}

1488
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1489 1490
{
    //  Notify the specific socket type about the pipe termination.
1491
    xpipe_terminated (pipe_);
1492

1493
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1494
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1495
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1496
            inprocs.erase (it);
1497
            break;
1498 1499
        }

1500 1501 1502 1503 1504 1505 1506
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1507 1508
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1509
    //  Test whether IDENTITY flag is valid for this socket type.
1510
    if (unlikely (msg_->flags () & msg_t::identity))
1511
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1512

1513
    //  Remove MORE flag.
1514 1515
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1516

1517
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1518
{
1519 1520 1521 1522
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1523
    //  Support deregistering monitoring endpoints as well
1524 1525 1526 1527 1528 1529 1530
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1531
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1532 1533
        return -1;

1534
    //  Event notification only supported over inproc://
1535 1536 1537 1538
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1539
    //  Register events to monitor
1540
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1541
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1542 1543 1544
    if (monitor_socket == NULL)
        return -1;

1545
    //  Never block context termination on pending event messages
1546
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1547
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1548
    if (rc == -1)
1549
        stop_monitor (false);
1550

1551
    //  Spawn the monitor socket endpoint
1552 1553
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
1554
         stop_monitor (false);
1555 1556 1557
    return rc;
}

1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
    file_desc = fd_;
}

zmq::fd_t zmq::socket_base_t::fd()
{
    return file_desc;
}

Martin Hurton's avatar
Martin Hurton committed
1568
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1569
{
1570 1571
    if (monitor_events & ZMQ_EVENT_CONNECTED)
        monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1572
}
1573

Martin Hurton's avatar
Martin Hurton committed
1574
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1575
{
1576 1577
    if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
        monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1578
}
1579

Martin Hurton's avatar
Martin Hurton committed
1580
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1581
{
1582 1583
    if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
        monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1584 1585
}

Martin Hurton's avatar
Martin Hurton committed
1586
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1587
{
1588 1589
    if (monitor_events & ZMQ_EVENT_LISTENING)
        monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1590 1591
}

Martin Hurton's avatar
Martin Hurton committed
1592
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1593
{
1594 1595
    if (monitor_events & ZMQ_EVENT_BIND_FAILED)
        monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1596 1597
}

Martin Hurton's avatar
Martin Hurton committed
1598
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1599
{
1600 1601
    if (monitor_events & ZMQ_EVENT_ACCEPTED)
        monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1602 1603
}

Martin Hurton's avatar
Martin Hurton committed
1604
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1605
{
1606 1607
    if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
        monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1608 1609
}

Martin Hurton's avatar
Martin Hurton committed
1610
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1611
{
1612 1613
    if (monitor_events & ZMQ_EVENT_CLOSED)
        monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1614
}
Martin Hurton's avatar
Martin Hurton committed
1615 1616

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1617
{
1618 1619
    if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
        monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1620 1621
}

Martin Hurton's avatar
Martin Hurton committed
1622
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1623
{
1624 1625
    if (monitor_events & ZMQ_EVENT_DISCONNECTED)
        monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1626 1627
}

1628 1629
//  Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1630
{
1631
    if (monitor_socket) {
1632
        //  Send event in first frame
1633
        zmq_msg_t msg;
1634 1635
        zmq_msg_init_size (&msg, 6);
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1636 1637 1638 1639 1640
        //  Avoid dereferencing uint32_t on unaligned address
        uint16_t event = (uint16_t) event_;
        uint32_t value = (uint32_t) value_;
        memcpy (data + 0, &event, sizeof(event));
        memcpy (data + 2, &value, sizeof(value));
1641
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1642 1643

        //  Send address in second frame
1644
        zmq_msg_init_size (&msg, addr_.size());
1645
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1646 1647
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1648 1649
}

1650
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1651 1652
{
    if (monitor_socket) {
1653
        if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1654
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1655 1656 1657 1658
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1659
}