socket_base.cpp 45 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <string>
32 33
#include <algorithm>

34
#include "macros.hpp"
35 36 37 38 39
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
40
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
41 42
#include <cmnintrin.h>
#else
43 44
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
45
#endif
46 47 48
#else
#include <unistd.h>
#endif
49

50
#include "socket_base.hpp"
51
#include "tcp_listener.hpp"
52
#include "ipc_listener.hpp"
53
#include "tipc_listener.hpp"
54
#include "tcp_connecter.hpp"
55
#include "io_thread.hpp"
56
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
58
#include "pipe.hpp"
59
#include "err.hpp"
60
#include "ctx.hpp"
malosek's avatar
malosek committed
61
#include "platform.hpp"
62
#include "likely.hpp"
63
#include "msg.hpp"
64 65 66
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
67
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
68 69
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
70 71 72
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
73

74 75 76 77 78 79 80
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
81 82
#include "dealer.hpp"
#include "router.hpp"
83 84
#include "xpub.hpp"
#include "xsub.hpp"
85
#include "stream.hpp"
86
#include "server.hpp"
87
#include "client.hpp"
88

somdoron's avatar
somdoron committed
89 90 91 92 93 94 95 96
#define ENTER_MUTEX() \
    if (thread_safe) \
        sync.lock();

#define EXIT_MUTEX() \
    if (thread_safe) \
        sync.unlock();

97 98 99 100 101
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

102
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
103
    uint32_t tid_, int sid_)
104 105 106
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
143 144 145
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
146 147 148
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
149 150 151
        default:
            errno = EINVAL;
            return NULL;
152
    }
153 154

    alloc_assert (s);
somdoron's avatar
somdoron committed
155 156 157

    mailbox_t *mailbox = dynamic_cast<mailbox_t*> (s->mailbox);

158 159
    if (mailbox != NULL && mailbox->get_fd () == retired_fd) {
        s->destroyed = true;
160
        LIBZMQ_DELETE(s);
Pieter Hintjens's avatar
Pieter Hintjens committed
161
        return NULL;
162
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
163

164 165 166
    return s;
}

somdoron's avatar
somdoron committed
167
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
168
    own_t (parent_, tid_),
169
    tag (0xbaddecaf),
170
    ctx_terminated (false),
171
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
172
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
173
    ticks (0),
174
    rcvmore (false),
175
    file_desc(-1),
176
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
177
    monitor_events (0),
178 179
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
180
{
181
    options.socket_id = sid_;
182
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
183
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
184 185 186 187 188

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else
        mailbox = new mailbox_t();
189 190 191 192
}

zmq::socket_base_t::~socket_base_t ()
{
193
    LIBZMQ_DELETE(mailbox);
194
    
195 196 197
    if (reaper_signaler) {
        LIBZMQ_DELETE(reaper_signaler);
    }
198
    
199
    stop_monitor ();
200
    zmq_assert (destroyed);
201 202
}

somdoron's avatar
somdoron committed
203
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
204
{
somdoron's avatar
somdoron committed
205
    return mailbox;
206 207 208 209 210 211 212 213 214 215 216
}

void zmq::socket_base_t::stop ()
{
    //  Called by ctx when it is terminated (zmq_term).
    //  'stop' command is sent from the threads that called zmq_term to
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

217 218 219 220 221 222 223 224 225 226 227 228 229
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
230

231 232 233 234 235 236 237
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

238 239
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
240
    //  First check out whether the protocol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
241 242 243 244 245 246 247
    if (protocol_ != "inproc"
    &&  protocol_ != "ipc"
    &&  protocol_ != "tcp"
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
    &&  protocol_ != "tipc"
    &&  protocol_ != "norm") {
248 249 250 251
        errno = EPROTONOSUPPORT;
        return -1;
    }
    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
252
    //  are not available.
253 254 255 256 257 258
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
259

bebopagogo's avatar
bebopagogo committed
260 261 262 263 264 265
#if !defined ZMQ_HAVE_NORM
    if (protocol_ == "norm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif // !ZMQ_HAVE_NORM
266 267 268

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
269
    if (protocol_ == "ipc") {
270 271 272 273 274 275
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

276
    // TIPC transport is only available on Linux.
277
#if !defined ZMQ_HAVE_TIPC
Erik Hugne's avatar
Erik Hugne committed
278
    if (protocol_ == "tipc") {
279 280 281 282 283
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

284 285 286
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
bebopagogo's avatar
bebopagogo committed
287
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
288 289
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
290 291 292 293 294 295 296 297
        errno = ENOCOMPATPROTO;
        return -1;
    }

    //  Protocol is available.
    return 0;
}

298
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
299
{
300 301 302
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
303

304
    //  Let the derived socket type know about new pipe.
305
    xattach_pipe (pipe_, subscribe_to_all_);
306 307 308 309 310

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
311
        pipe_->terminate (false);
312
    }
313 314
}

315
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
316
    size_t optvallen_)
317
{
somdoron's avatar
somdoron committed
318 319
    ENTER_MUTEX();

320 321 322 323 324
    if (!options.is_valid(option_)) {
        errno = EINVAL;
        EXIT_MUTEX();
        return -1;
    }
325

326
    if (unlikely (ctx_terminated)) {
327
        errno = ETERM;
somdoron's avatar
somdoron committed
328
        EXIT_MUTEX();
329 330 331
        return -1;
    }

332 333
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
334 335
    if (rc == 0 || errno != EINVAL) {
        EXIT_MUTEX();
336
        return rc;
somdoron's avatar
somdoron committed
337
    }
338 339 340

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
341
    rc = options.setsockopt (option_, optval_, optvallen_);
342
    update_pipe_options(option_);
somdoron's avatar
somdoron committed
343 344 345

    EXIT_MUTEX();
    return rc;
346 347
}

348 349 350
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
somdoron's avatar
somdoron committed
351 352
    ENTER_MUTEX();

353
    if (unlikely (ctx_terminated)) {
354
        errno = ETERM;
somdoron's avatar
somdoron committed
355
        EXIT_MUTEX();
356 357 358
        return -1;
    }

359
    if (option_ == ZMQ_RCVMORE) {
360
        if (*optvallen_ < sizeof (int)) {
361
            errno = EINVAL;
somdoron's avatar
somdoron committed
362
            EXIT_MUTEX();
363 364
            return -1;
        }
365
        memset(optval_, 0, *optvallen_);
366 367
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
368
        EXIT_MUTEX();
369 370 371
        return 0;
    }

372 373 374
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
375
            EXIT_MUTEX();
376 377
            return -1;
        }
somdoron's avatar
somdoron committed
378 379 380 381 382 383 384

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            EXIT_MUTEX();
            return -1;
        }
385

somdoron's avatar
somdoron committed
386
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
387 388
        *optvallen_ = sizeof(fd_t);

somdoron's avatar
somdoron committed
389
        EXIT_MUTEX();
390 391 392 393
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
394
        if (*optvallen_ < sizeof (int)) {
395
            errno = EINVAL;
somdoron's avatar
somdoron committed
396
            EXIT_MUTEX();
397 398
            return -1;
        }
399
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
400 401
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
            EXIT_MUTEX();
402
            return -1;
somdoron's avatar
somdoron committed
403
        }
404
        errno_assert (rc == 0);
405
        *((int*) optval_) = 0;
406
        if (has_out ())
407
            *((int*) optval_) |= ZMQ_POLLOUT;
408
        if (has_in ())
409 410
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
411
        EXIT_MUTEX();
412 413 414
        return 0;
    }

415 416 417
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
418
            EXIT_MUTEX();
419 420 421 422
            return -1;
        }
        strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
        *optvallen_ = last_endpoint.size () + 1;
somdoron's avatar
somdoron committed
423
        EXIT_MUTEX();
424 425 426
        return 0;
    }

427 428 429 430 431 432 433 434 435 436 437 438 439
    if (option_ == ZMQ_THREAD_SAFE) {
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
            EXIT_MUTEX();
            return -1;
        }
        memset(optval_, 0, *optvallen_);
        *((int*) optval_) = thread_safe	? 1 : 0;
        *optvallen_ = sizeof (int);
        EXIT_MUTEX();
        return 0;
    }  

somdoron's avatar
somdoron committed
440 441 442
    int rc = options.getsockopt (option_, optval_, optvallen_);
    EXIT_MUTEX();
    return rc;
443 444
}

445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
    ENTER_MUTEX();

    if (!thread_safe) {
        errno = EINVAL;
        EXIT_MUTEX();
        return -1;  
    }

    ((mailbox_safe_t*)mailbox)->add_signaler(s_);

    EXIT_MUTEX();
    return 0;
}

int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
    ENTER_MUTEX();

    if (!thread_safe) {
        errno = EINVAL;
        EXIT_MUTEX();
        return -1;
    }

    ((mailbox_safe_t*)mailbox)->remove_signaler(s_);

    EXIT_MUTEX();
    return 0;
}

477 478
int zmq::socket_base_t::bind (const char *addr_)
{
somdoron's avatar
somdoron committed
479 480
    ENTER_MUTEX();

481
    if (unlikely (ctx_terminated)) {
482
        errno = ETERM;
somdoron's avatar
somdoron committed
483
        EXIT_MUTEX();
484 485 486
        return -1;
    }

487 488
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
489 490
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
491
        return -1;
somdoron's avatar
somdoron committed
492
    }
493

494
    //  Parse addr_ string.
495 496
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
497 498
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
499
        return -1;
somdoron's avatar
somdoron committed
500
    }
501

Pieter Hintjens's avatar
Pieter Hintjens committed
502
    if (protocol == "inproc") {
503
        const endpoint_t endpoint = { this, options };
Martin Hurton's avatar
Martin Hurton committed
504
        const int rc = register_endpoint (addr_, endpoint);
505
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
506
            connect_pending (addr_, this);
507
            last_endpoint.assign (addr_);
508
            options.connected = true;
509
        }
somdoron's avatar
somdoron committed
510
        EXIT_MUTEX();
511
        return rc;
512
    }
513

bebopagogo's avatar
bebopagogo committed
514
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
515
        //  For convenience's sake, bind can be used interchangeable with
bebopagogo's avatar
bebopagogo committed
516
        //  connect for PGM, EPGM and NORM transports.
somdoron's avatar
somdoron committed
517
        EXIT_MUTEX();
518 519 520 521
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
522 523
    }

524
    //  Remaining transports require to be run in an I/O thread, so at this
525 526 527 528
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
529
        EXIT_MUTEX();
530 531
        return -1;
    }
532

533
    if (protocol == "tcp") {
534
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
535
            io_thread, this, options);
536
        alloc_assert (listener);
537
        int rc = listener->set_address (address.c_str ());
538
        if (rc != 0) {
539
            LIBZMQ_DELETE(listener);
540
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
541
            EXIT_MUTEX();
542
            return -1;
543
        }
544

545
        // Save last endpoint URI
546
        listener->get_address (last_endpoint);
547

548
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
549
        options.connected = true;
somdoron's avatar
somdoron committed
550
        EXIT_MUTEX();
551 552 553
        return 0;
    }

554
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
555 556 557 558
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
559
        int rc = listener->set_address (address.c_str ());
560
        if (rc != 0) {
561
            LIBZMQ_DELETE(listener);
562
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
563
            EXIT_MUTEX();
564 565
            return -1;
        }
566

567
        // Save last endpoint URI
568
        listener->get_address (last_endpoint);
569

570
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
571
        options.connected = true;
somdoron's avatar
somdoron committed
572
        EXIT_MUTEX();
573
        return 0;
574
    }
575
#endif
576
#if defined ZMQ_HAVE_TIPC
577 578 579 580 581 582
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
583
             LIBZMQ_DELETE(listener);
584
             event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
585
             EXIT_MUTEX();
586 587 588 589 590 591 592
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
593
        options.connected = true;
somdoron's avatar
somdoron committed
594
        EXIT_MUTEX();
595 596 597
        return 0;
    }
#endif
598

somdoron's avatar
somdoron committed
599
    EXIT_MUTEX();
600
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
601
    return -1;
602 603
}

604
int zmq::socket_base_t::connect (const char *addr_)
605
{
somdoron's avatar
somdoron committed
606 607
    ENTER_MUTEX();

608
    if (unlikely (ctx_terminated)) {
609
        errno = ETERM;
somdoron's avatar
somdoron committed
610
        EXIT_MUTEX();
611 612 613
        return -1;
    }

614 615
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
616 617
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
618
        return -1;
somdoron's avatar
somdoron committed
619
    }
620

malosek's avatar
malosek committed
621
    //  Parse addr_ string.
622 623
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
624 625
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
626
        return -1;
somdoron's avatar
somdoron committed
627
    }
malosek's avatar
malosek committed
628

Pieter Hintjens's avatar
Pieter Hintjens committed
629
    if (protocol == "inproc") {
630

631 632 633 634
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

635 636
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
637

638
        // The total HWM for an inproc connection should be the sum of
639
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
640
        int sndhwm = 0;
641 642 643
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
644
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
645
        int rcvhwm = 0;
646 647
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
648 649
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
650
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
651

652
        //  Create a bi-directional pipe to connect the peers.
653
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
654
        pipe_t *new_pipes [2] = {NULL, NULL};
655 656 657 658 659 660 661 662 663 664

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
665
        int rc = pipepair (parents, new_pipes, hwms, conflates);
666 667 668 669 670
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

671
        errno_assert (rc == 0);
672

673 674 675 676 677 678 679 680 681 682 683 684 685 686
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
687 688
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
689
        }
Martin Hurton's avatar
Martin Hurton committed
690
        else {
691 692 693 694 695 696 697 698 699 700 701
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
702

703 704 705 706 707 708 709 710 711 712 713
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
714

715 716 717 718 719
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
720

Martin Hurton's avatar
Martin Hurton committed
721 722 723
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

724
        // Save last endpoint URI
725
        last_endpoint.assign (addr_);
726

727
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
728
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
729

730
        options.connected = true;
somdoron's avatar
somdoron committed
731
        EXIT_MUTEX();
732 733
        return 0;
    }
734 735 736 737
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
738
        const endpoints_t::iterator it = endpoints.find (addr_);
739 740 741 742
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
somdoron's avatar
somdoron committed
743
            EXIT_MUTEX();
744 745 746
            return 0;
        }
    }
747

748 749 750 751
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
752
        EXIT_MUTEX();
753 754 755
        return -1;
    }

756
    address_t *paddr = new (std::nothrow) address_t (protocol, address);
757
    alloc_assert (paddr);
758 759 760

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
761 762 763 764 765 766 767 768 769
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
770
        if (isalnum (*check) || isxdigit (*check) || *check == '[') {
771 772 773
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
774 775
                || *check == '.' || *check == '-' || *check == ':'|| *check == ';'
                || *check == ']')
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
                check++;
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
791
            errno = EINVAL;
792
            LIBZMQ_DELETE(paddr);
somdoron's avatar
somdoron committed
793
            EXIT_MUTEX();
794 795 796
            return -1;
        }
        //  Defer resolution until a socket is opened
797
        paddr->resolved.tcp_addr = NULL;
798
    }
799
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
800 801
    else
    if (protocol == "ipc") {
802
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
803
        alloc_assert (paddr->resolved.ipc_addr);
804 805
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
806
            LIBZMQ_DELETE(paddr);
somdoron's avatar
somdoron committed
807
            EXIT_MUTEX();
808 809 810
            return -1;
        }
    }
811
#endif
812

bebopagogo's avatar
bebopagogo committed
813
// TBD - Should we check address for ZMQ_HAVE_NORM???
814

815 816 817 818 819 820 821
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
822 823 824 825
        if (rc != 0 || port_number == 0) {
          EXIT_MUTEX();
          return -1;
        }
826
    }
827
#endif
828
#if defined ZMQ_HAVE_TIPC
829 830 831 832 833 834
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
835
            LIBZMQ_DELETE(paddr);
somdoron's avatar
somdoron committed
836
            EXIT_MUTEX();
837 838 839 840 841
            return -1;
        }
    }
#endif

842
    //  Create session.
843
    session_base_t *session = session_base_t::create (io_thread, true, this,
844
        options, paddr);
845
    errno_assert (session);
846

847
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
848 849
    //  sent to this pipe. (same for NORM, currently?)
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm";
850
    pipe_t *newpipe = NULL;
851

852
    if (options.immediate != 1 || subscribe_to_all) {
853 854
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
855
        pipe_t *new_pipes [2] = {NULL, NULL};
856 857 858 859 860 861 862 863 864 865 866

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
867
        rc = pipepair (parents, new_pipes, hwms, conflates);
868
        errno_assert (rc == 0);
869

870
        //  Attach local end of the pipe to the socket object.
871
        attach_pipe (new_pipes [0], subscribe_to_all);
872
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
873

874
        //  Attach remote end of the pipe to the session object later on.
875
        session->attach_pipe (new_pipes [1]);
876 877 878
    }

    //  Save last endpoint URI
879
    paddr->to_string (last_endpoint);
880

881
    add_endpoint (addr_, (own_t *) session, newpipe);
somdoron's avatar
somdoron committed
882
    EXIT_MUTEX();
883 884 885
    return 0;
}

886
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
887
{
888
    //  Activate the session. Make it a child of this socket.
889
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
890
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
891 892 893 894
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
somdoron's avatar
somdoron committed
895 896
    ENTER_MUTEX();

897 898 899
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
somdoron's avatar
somdoron committed
900
        EXIT_MUTEX();
901 902
        return -1;
    }
malosek's avatar
malosek committed
903

904
    //  Check whether endpoint address passed to the function is valid.
905 906
    if (unlikely (!addr_)) {
        errno = EINVAL;
somdoron's avatar
somdoron committed
907
        EXIT_MUTEX();
908 909 910
        return -1;
    }

911 912 913
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
914 915
    if (unlikely(rc != 0)) {
        EXIT_MUTEX();
916
        return -1;
somdoron's avatar
somdoron committed
917
    }
918

919 920 921
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
922 923
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
        EXIT_MUTEX();
924
        return -1;
somdoron's avatar
somdoron committed
925
    }
926 927 928

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
929 930
        if (unregister_endpoint (std::string(addr_), this) == 0) {
            EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
931
            return 0;
somdoron's avatar
somdoron committed
932
        }
933 934 935
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
somdoron's avatar
somdoron committed
936
            EXIT_MUTEX();
937 938
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
939

940
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
941
            it->second->terminate (true);
942
        inprocs.erase (range.first, range.second);
somdoron's avatar
somdoron committed
943
        EXIT_MUTEX();
944 945 946
        return 0;
    }

947 948
    //  Find the endpoints range (if any) corresponding to the addr_ string.
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
949 950
    if (range.first == range.second) {
        errno = ENOENT;
somdoron's avatar
somdoron committed
951
        EXIT_MUTEX();
952
        return -1;
953
    }
954

955 956 957
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
958
            it->second.second->terminate (false);
959
        term_child (it->second.first);
960
    }
961
    endpoints.erase (range.first, range.second);
somdoron's avatar
somdoron committed
962
    EXIT_MUTEX();
963
    return 0;
964 965
}

966
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
967
{
somdoron's avatar
somdoron committed
968 969
    ENTER_MUTEX();

970
    //  Check whether the library haven't been shut down yet.
971
    if (unlikely (ctx_terminated)) {
972
        errno = ETERM;
somdoron's avatar
somdoron committed
973
        EXIT_MUTEX();
974 975 976
        return -1;
    }

977
    //  Check whether message passed to the function is valid.
978
    if (unlikely (!msg_ || !msg_->check ())) {
979
        errno = EFAULT;
somdoron's avatar
somdoron committed
980
        EXIT_MUTEX();
981 982 983
        return -1;
    }

984
    //  Process pending commands, if any.
985
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
986 987
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
988
        return -1;
somdoron's avatar
somdoron committed
989
    }
990

991 992 993
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

994
    //  At this point we impose the flags on the message.
995
    if (flags_ & ZMQ_SNDMORE)
996
        msg_->set_flags (msg_t::more);
997

998 999
    msg_->reset_metadata ();

Martin Sustrik's avatar
Martin Sustrik committed
1000
    //  Try to send the message.
1001
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
1002 1003
    if (rc == 0) {
        EXIT_MUTEX();
1004
        return 0;
somdoron's avatar
somdoron committed
1005 1006 1007
    }
    if (unlikely (errno != EAGAIN)) {
        EXIT_MUTEX();
1008
        return -1;
somdoron's avatar
somdoron committed
1009
    }
Martin Sustrik's avatar
Martin Sustrik committed
1010

1011
    //  In case of non-blocking send we'll simply propagate
1012
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
1013 1014
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
        EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
1015
        return -1;
somdoron's avatar
somdoron committed
1016
    }
Martin Sustrik's avatar
Martin Sustrik committed
1017

1018
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1019
    //  If the timeout is infinite, don't care.
1020 1021 1022
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1023 1024
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
1025 1026
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
1027 1028
        if (unlikely (process_commands (timeout, false) != 0)) {
            EXIT_MUTEX();
1029
            return -1;
1030
        }
1031
        rc = xsend (msg_);
1032 1033
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
1034 1035
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
1036
            return -1;
somdoron's avatar
somdoron committed
1037
        }
1038
        if (timeout > 0) {
1039
            timeout = (int) (end - clock.now_ms ());
1040 1041
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
1042
                EXIT_MUTEX();
1043 1044 1045
                return -1;
            }
        }
1046
    }
somdoron's avatar
somdoron committed
1047 1048

    EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
1049
    return 0;
1050 1051
}

1052
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1053
{
somdoron's avatar
somdoron committed
1054 1055
    ENTER_MUTEX();

1056
    //  Check whether the library haven't been shut down yet.
1057
    if (unlikely (ctx_terminated)) {
1058
        errno = ETERM;
somdoron's avatar
somdoron committed
1059
        EXIT_MUTEX();
1060 1061 1062
        return -1;
    }

1063
    //  Check whether message passed to the function is valid.
1064
    if (unlikely (!msg_ || !msg_->check ())) {
1065
        errno = EFAULT;
somdoron's avatar
somdoron committed
1066
        EXIT_MUTEX();
1067 1068 1069
        return -1;
    }

1070 1071 1072 1073 1074 1075 1076
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1077
    //  ticks is more efficient than doing RDTSC all the time.
1078
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1079 1080
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1081
            return -1;
somdoron's avatar
somdoron committed
1082
        }
1083 1084 1085
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1086
    //  Get the message.
1087
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1088 1089
    if (unlikely (rc != 0 && errno != EAGAIN)) {
        EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
1090
        return -1;
somdoron's avatar
somdoron committed
1091
    }
Martin Hurton's avatar
Martin Hurton committed
1092

1093
    //  If we have the message, return immediately.
1094
    if (rc == 0) {
1095
        if (file_desc != retired_fd)
1096
            msg_->set_fd(file_desc);
1097
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1098
        EXIT_MUTEX();
1099
        return 0;
1100
    }
1101

Martin Sustrik's avatar
Martin Sustrik committed
1102
    //  If the message cannot be fetched immediately, there are two scenarios.
1103 1104 1105
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
1106
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1107 1108
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1109
            return -1;
somdoron's avatar
somdoron committed
1110
        }
1111
        ticks = 0;
1112

1113
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1114 1115
        if (rc < 0) {
            EXIT_MUTEX();
1116
            return rc;
somdoron's avatar
somdoron committed
1117
        }
1118
        if (file_desc != retired_fd)
1119
            msg_->set_fd(file_desc);
1120
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1121 1122

        EXIT_MUTEX();
1123
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1124 1125
    }

1126
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1127
    //  If the timeout is infinite, don't care.
1128 1129 1130
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1131 1132
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1133
    bool block = (ticks != 0);
1134
    while (true) {
somdoron's avatar
somdoron committed
1135 1136
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
            EXIT_MUTEX();
1137
            return -1;
somdoron's avatar
somdoron committed
1138
        }
1139
        rc = xrecv (msg_);
1140 1141 1142 1143
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1144 1145
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
1146
            return -1;
somdoron's avatar
somdoron committed
1147
        }
1148
        block = true;
1149
        if (timeout > 0) {
1150
            timeout = (int) (end - clock.now_ms ());
1151 1152
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
1153
                EXIT_MUTEX();
1154 1155 1156
                return -1;
            }
        }
1157
    }
1158

1159
    if (file_desc != retired_fd)
1160
        msg_->set_fd(file_desc);
1161
    extract_flags (msg_);
somdoron's avatar
somdoron committed
1162
    EXIT_MUTEX();
1163
    return 0;
1164 1165 1166 1167
}

int zmq::socket_base_t::close ()
{
1168 1169
    //  Mark the socket as dead
    tag = 0xdeadbeef;
1170

1171 1172 1173 1174
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1175

1176 1177 1178
    return 0;
}

1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1189
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1190
{
1191
    //  Plug the socket to the reaper thread.
1192
    poller = poller_;
somdoron's avatar
somdoron committed
1193 1194 1195 1196 1197

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
1198
    else {
somdoron's avatar
somdoron committed
1199 1200
        ENTER_MUTEX();

1201 1202
        reaper_signaler =  new signaler_t();

somdoron's avatar
somdoron committed
1203
        //  Add signaler to the safe mailbox
1204
        fd = reaper_signaler->get_fd();
1205
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1206 1207

        //  Send a signal to make sure reaper handle existing commands
1208
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1209 1210 1211 1212 1213

        EXIT_MUTEX();
    }

    handle = poller->add_fd (fd, this);
1214
    poller->set_pollin (handle);
1215 1216 1217 1218 1219

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1220 1221
}

1222
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1223
{
1224
    int rc;
1225
    command_t cmd;
1226 1227 1228
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1229
        rc = mailbox->recv (&cmd, timeout_);
1230 1231
    }
    else {
1232

1233 1234 1235
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1236
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1237
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1238

1239 1240 1241 1242 1243 1244
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1245 1246
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1247 1248 1249
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1250
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1251
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1252
            last_tsc = tsc;
1253 1254 1255
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1256
        rc = mailbox->recv (&cmd, 0);
1257
    }
Martin Sustrik's avatar
Martin Sustrik committed
1258

1259 1260
    //  Process all available commands.
    while (rc == 0) {
1261
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1262
        rc = mailbox->recv (&cmd, 0);
1263 1264 1265 1266 1267 1268
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1269

1270
    if (ctx_terminated) {
1271 1272
        errno = ETERM;
        return -1;
1273
    }
1274 1275

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1276 1277
}

1278
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1279
{
1280
    //  Here, someone have called zmq_term while the socket was still alive.
1281
    //  We'll remember the fact so that any blocking call is interrupted and any
1282 1283
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1284
    stop_monitor ();
1285
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1286 1287
}

1288
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1289
{
1290
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1291 1292
}

1293
void zmq::socket_base_t::process_term (int linger_)
1294
{
1295 1296 1297 1298 1299
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1300 1301
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1302
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1303
    register_term_acks ((int) pipes.size ());
1304

1305
    //  Continue the termination process immediately.
1306
    own_t::process_term (linger_);
1307 1308
}

1309 1310
void zmq::socket_base_t::update_pipe_options(int option_)
{
1311 1312 1313 1314 1315 1316 1317
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    {
        for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
        {
            pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
        }
    }
1318 1319 1320

}

1321 1322 1323 1324 1325
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1326
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1327 1328 1329 1330
{
    errno = EINVAL;
    return -1;
}
1331 1332 1333 1334 1335 1336

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1337
int zmq::socket_base_t::xsend (msg_t *)
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

1348
int zmq::socket_base_t::xrecv (msg_t *)
1349 1350 1351 1352 1353
{
    errno = ENOTSUP;
    return -1;
}

1354 1355 1356 1357 1358
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1359
void zmq::socket_base_t::xread_activated (pipe_t *)
1360 1361 1362
{
    zmq_assert (false);
}
1363
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1364 1365 1366 1367
{
    zmq_assert (false);
}

1368
void zmq::socket_base_t::xhiccuped (pipe_t *)
1369
{
1370
    zmq_assert (false);
1371 1372
}

1373 1374
void zmq::socket_base_t::in_event ()
{
1375 1376 1377 1378
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
somdoron's avatar
somdoron committed
1379 1380

    ENTER_MUTEX();
1381

1382 1383 1384
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
1385

1386
    process_commands (0, false);
somdoron's avatar
somdoron committed
1387
    EXIT_MUTEX();
1388 1389 1390 1391 1392 1393 1394 1395
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1396
void zmq::socket_base_t::timer_event (int)
1397 1398 1399
{
    zmq_assert (false);
}
1400

1401 1402
void zmq::socket_base_t::check_destroy ()
{
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1430 1431
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1432
    if (options.immediate == 1)
1433 1434 1435 1436
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1437 1438
}

1439
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1440 1441
{
    //  Notify the specific socket type about the pipe termination.
1442
    xpipe_terminated (pipe_);
1443

1444
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1445
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1446
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1447
            inprocs.erase (it);
1448
            break;
1449 1450
        }

1451 1452 1453 1454 1455 1456 1457
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1458 1459
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1460
    //  Test whether IDENTITY flag is valid for this socket type.
1461
    if (unlikely (msg_->flags () & msg_t::identity))
1462
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1463

1464
    //  Remove MORE flag.
1465 1466
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1467

1468
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1469
{
1470 1471 1472 1473
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1474
    //  Support deregistering monitoring endpoints as well
1475 1476 1477 1478 1479 1480 1481
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1482
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1483 1484
        return -1;

1485
    //  Event notification only supported over inproc://
1486 1487 1488 1489
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1490
    //  Register events to monitor
1491
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1492
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1493 1494 1495
    if (monitor_socket == NULL)
        return -1;

1496
    //  Never block context termination on pending event messages
1497
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1498
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1499
    if (rc == -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
1500
        stop_monitor ();
1501

1502
    //  Spawn the monitor socket endpoint
1503 1504 1505 1506 1507 1508
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
         stop_monitor ();
    return rc;
}

1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
    file_desc = fd_;
}

zmq::fd_t zmq::socket_base_t::fd()
{
    return file_desc;
}

Martin Hurton's avatar
Martin Hurton committed
1519
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1520
{
1521 1522
    if (monitor_events & ZMQ_EVENT_CONNECTED)
        monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1523
}
1524

Martin Hurton's avatar
Martin Hurton committed
1525
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1526
{
1527 1528
    if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
        monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1529
}
1530

Martin Hurton's avatar
Martin Hurton committed
1531
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1532
{
1533 1534
    if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
        monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1535 1536
}

Martin Hurton's avatar
Martin Hurton committed
1537
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1538
{
1539 1540
    if (monitor_events & ZMQ_EVENT_LISTENING)
        monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1541 1542
}

Martin Hurton's avatar
Martin Hurton committed
1543
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1544
{
1545 1546
    if (monitor_events & ZMQ_EVENT_BIND_FAILED)
        monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1547 1548
}

Martin Hurton's avatar
Martin Hurton committed
1549
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1550
{
1551 1552
    if (monitor_events & ZMQ_EVENT_ACCEPTED)
        monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1553 1554
}

Martin Hurton's avatar
Martin Hurton committed
1555
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1556
{
1557 1558
    if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
        monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1559 1560
}

Martin Hurton's avatar
Martin Hurton committed
1561
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1562
{
1563 1564
    if (monitor_events & ZMQ_EVENT_CLOSED)
        monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1565
}
Martin Hurton's avatar
Martin Hurton committed
1566 1567

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1568
{
1569 1570
    if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
        monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1571 1572
}

Martin Hurton's avatar
Martin Hurton committed
1573
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1574
{
1575 1576
    if (monitor_events & ZMQ_EVENT_DISCONNECTED)
        monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1577 1578
}

1579 1580
//  Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1581
{
1582
    if (monitor_socket) {
1583
        //  Send event in first frame
1584
        zmq_msg_t msg;
1585 1586
        zmq_msg_init_size (&msg, 6);
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1587 1588 1589 1590 1591
        //  Avoid dereferencing uint32_t on unaligned address
        uint16_t event = (uint16_t) event_;
        uint32_t value = (uint32_t) value_;
        memcpy (data + 0, &event, sizeof(event));
        memcpy (data + 2, &value, sizeof(value));
1592
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1593 1594

        //  Send address in second frame
1595
        zmq_msg_init_size (&msg, addr_.size());
1596
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1597 1598
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1599 1600
}

1601
void zmq::socket_base_t::stop_monitor (void)
1602 1603
{
    if (monitor_socket) {
1604 1605
        if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1606 1607 1608 1609
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1610
}