socket_base.cpp 44.2 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <string>
32 33
#include <algorithm>

34 35 36 37 38
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46 47
#else
#include <unistd.h>
#endif
48

49
#include "socket_base.hpp"
50
#include "tcp_listener.hpp"
51
#include "ipc_listener.hpp"
52
#include "tipc_listener.hpp"
53
#include "tcp_connecter.hpp"
54
#include "io_thread.hpp"
55
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
56
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "pipe.hpp"
58
#include "err.hpp"
59
#include "ctx.hpp"
malosek's avatar
malosek committed
60
#include "platform.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
67 68
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
69 70 71
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
72

73 74 75 76 77 78 79
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
80 81
#include "dealer.hpp"
#include "router.hpp"
82 83
#include "xpub.hpp"
#include "xsub.hpp"
84
#include "stream.hpp"
85
#include "server.hpp"
86
#include "client.hpp"
87

somdoron's avatar
somdoron committed
88 89 90 91 92 93 94 95
#define ENTER_MUTEX() \
    if (thread_safe) \
        sync.lock();

#define EXIT_MUTEX() \
    if (thread_safe) \
        sync.unlock();

96 97 98 99 100
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

101
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
102
    uint32_t tid_, int sid_)
103 104 105
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
142 143 144
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
145 146 147
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
148 149 150
        default:
            errno = EINVAL;
            return NULL;
151
    }
152 153

    alloc_assert (s);
somdoron's avatar
somdoron committed
154 155 156 157

    mailbox_t *mailbox = dynamic_cast<mailbox_t*> (s->mailbox);

    if (mailbox != NULL && mailbox->get_fd () == retired_fd)
Pieter Hintjens's avatar
Pieter Hintjens committed
158 159
        return NULL;

160 161 162
    return s;
}

somdoron's avatar
somdoron committed
163
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
164
    own_t (parent_, tid_),
165
    tag (0xbaddecaf),
166
    ctx_terminated (false),
167
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
168
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
169
    ticks (0),
170
    rcvmore (false),
171
    file_desc(-1),
172
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
173
    monitor_events (0),
174 175
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
176
{
177
    options.socket_id = sid_;
178
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
179
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
180 181 182 183 184

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else
        mailbox = new mailbox_t();
185 186 187 188
}

zmq::socket_base_t::~socket_base_t ()
{
somdoron's avatar
somdoron committed
189
    delete mailbox;
190 191 192 193
    
    if (reaper_signaler)
        delete reaper_signaler;
    
194
    stop_monitor ();
195
    zmq_assert (destroyed);
196 197
}

somdoron's avatar
somdoron committed
198
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
199
{
somdoron's avatar
somdoron committed
200
    return mailbox;
201 202 203 204 205 206 207 208 209 210 211
}

void zmq::socket_base_t::stop ()
{
    //  Called by ctx when it is terminated (zmq_term).
    //  'stop' command is sent from the threads that called zmq_term to
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

212 213 214 215 216 217 218 219 220 221 222 223 224
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
225

226 227 228 229 230 231 232
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

233 234 235
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
    //  First check out whether the protcol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
236 237 238 239 240 241 242
    if (protocol_ != "inproc"
    &&  protocol_ != "ipc"
    &&  protocol_ != "tcp"
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
    &&  protocol_ != "tipc"
    &&  protocol_ != "norm") {
243 244 245 246 247 248 249 250 251 252 253
        errno = EPROTONOSUPPORT;
        return -1;
    }
    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
    //  are not avaialble.
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
254

bebopagogo's avatar
bebopagogo committed
255 256 257 258 259 260
#if !defined ZMQ_HAVE_NORM
    if (protocol_ == "norm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif // !ZMQ_HAVE_NORM
261 262 263

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
264
    if (protocol_ == "ipc") {
265 266 267 268 269 270
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

271
    // TIPC transport is only available on Linux.
272
#if !defined ZMQ_HAVE_TIPC
Erik Hugne's avatar
Erik Hugne committed
273
    if (protocol_ == "tipc") {
274 275 276 277 278
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

279 280 281
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
bebopagogo's avatar
bebopagogo committed
282
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
283 284
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
285 286 287 288 289 290 291 292
        errno = ENOCOMPATPROTO;
        return -1;
    }

    //  Protocol is available.
    return 0;
}

293
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
294
{
295 296 297 298
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
    
299
    //  Let the derived socket type know about new pipe.
300
    xattach_pipe (pipe_, subscribe_to_all_);
301 302 303 304 305

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
306
        pipe_->terminate (false);
307
    }
308 309
}

310
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
311
    size_t optvallen_)
312
{
somdoron's avatar
somdoron committed
313 314
    ENTER_MUTEX();

315 316 317 318 319 320 321
	if (!options.is_valid(option_)) {
		errno = EINVAL;
		EXIT_MUTEX();
		return -1;
	}


322
    if (unlikely (ctx_terminated)) {
323
        errno = ETERM;
somdoron's avatar
somdoron committed
324
        EXIT_MUTEX();
325 326 327
        return -1;
    }

328 329
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
330 331
    if (rc == 0 || errno != EINVAL) {
        EXIT_MUTEX();
332
        return rc;
somdoron's avatar
somdoron committed
333
    }
334 335 336

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
337
    rc = options.setsockopt (option_, optval_, optvallen_);
338
	update_pipe_options(option_);
somdoron's avatar
somdoron committed
339 340 341

    EXIT_MUTEX();
    return rc;
342 343
}

344 345 346
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
somdoron's avatar
somdoron committed
347 348
    ENTER_MUTEX();

349
    if (unlikely (ctx_terminated)) {
350
        errno = ETERM;
somdoron's avatar
somdoron committed
351
        EXIT_MUTEX();
352 353 354
        return -1;
    }

355
    if (option_ == ZMQ_RCVMORE) {
356
        if (*optvallen_ < sizeof (int)) {
357
            errno = EINVAL;
somdoron's avatar
somdoron committed
358
            EXIT_MUTEX();
359 360
            return -1;
        }
361 362
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
363
        EXIT_MUTEX();
364 365 366
        return 0;
    }

367 368 369
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
370
            EXIT_MUTEX();
371 372
            return -1;
        }
somdoron's avatar
somdoron committed
373 374 375 376 377 378 379 380 381 382 383 384

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            EXIT_MUTEX();
            return -1;
        }
        
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
        *optvallen_ = sizeof(fd_t);                      
        
        EXIT_MUTEX();
385 386 387 388
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
389
        if (*optvallen_ < sizeof (int)) {
390
            errno = EINVAL;
somdoron's avatar
somdoron committed
391
            EXIT_MUTEX();
392 393
            return -1;
        }
394
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
395 396
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
            EXIT_MUTEX();
397
            return -1;
somdoron's avatar
somdoron committed
398
        }
399
        errno_assert (rc == 0);
400
        *((int*) optval_) = 0;
401
        if (has_out ())
402
            *((int*) optval_) |= ZMQ_POLLOUT;
403
        if (has_in ())
404 405
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
406
        EXIT_MUTEX();
407 408 409
        return 0;
    }

410 411 412
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
413
            EXIT_MUTEX();
414 415 416 417
            return -1;
        }
        strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
        *optvallen_ = last_endpoint.size () + 1;
somdoron's avatar
somdoron committed
418
        EXIT_MUTEX();
419 420 421
        return 0;
    }

somdoron's avatar
somdoron committed
422 423 424
    int rc = options.getsockopt (option_, optval_, optvallen_);
    EXIT_MUTEX();
    return rc;
425 426
}

427 428
int zmq::socket_base_t::bind (const char *addr_)
{
somdoron's avatar
somdoron committed
429 430
    ENTER_MUTEX();

431
    if (unlikely (ctx_terminated)) {
432
        errno = ETERM;
somdoron's avatar
somdoron committed
433
        EXIT_MUTEX();
434 435 436
        return -1;
    }

437 438
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
439 440
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
441
        return -1;
somdoron's avatar
somdoron committed
442
    }
443

444
    //  Parse addr_ string.
445 446
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
447 448
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
449
        return -1;
somdoron's avatar
somdoron committed
450
    }
451

Pieter Hintjens's avatar
Pieter Hintjens committed
452
    if (protocol == "inproc") {
453
        const endpoint_t endpoint = { this, options };
Martin Hurton's avatar
Martin Hurton committed
454
        const int rc = register_endpoint (addr_, endpoint);
455
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
456
            connect_pending (addr_, this);
457
            last_endpoint.assign (addr_);
458
            options.connected = true;
459
        }
somdoron's avatar
somdoron committed
460
        EXIT_MUTEX();
461
        return rc;
462
    }
463

bebopagogo's avatar
bebopagogo committed
464
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
465
        //  For convenience's sake, bind can be used interchageable with
bebopagogo's avatar
bebopagogo committed
466
        //  connect for PGM, EPGM and NORM transports.
somdoron's avatar
somdoron committed
467
        EXIT_MUTEX();
468 469 470 471
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
472 473 474 475 476 477 478
    }

    //  Remaining trasnports require to be run in an I/O thread, so at this
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
479
        EXIT_MUTEX();
480 481
        return -1;
    }
482

483
    if (protocol == "tcp") {
484
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
485
            io_thread, this, options);
486
        alloc_assert (listener);
487
        int rc = listener->set_address (address.c_str ());
488 489
        if (rc != 0) {
            delete listener;
490
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
491
            EXIT_MUTEX();
492
            return -1;
493
        }
494

495
        // Save last endpoint URI
496
        listener->get_address (last_endpoint);
497

498
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
499
        options.connected = true;
somdoron's avatar
somdoron committed
500
        EXIT_MUTEX();
501 502 503
        return 0;
    }

504
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
505 506 507 508
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
509
        int rc = listener->set_address (address.c_str ());
510 511
        if (rc != 0) {
            delete listener;
512
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
513
            EXIT_MUTEX();
514 515
            return -1;
        }
516

517
        // Save last endpoint URI
518
        listener->get_address (last_endpoint);
519

520
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
521
        options.connected = true;
somdoron's avatar
somdoron committed
522
        EXIT_MUTEX();
523
        return 0;
524
    }
525
#endif
526
#if defined ZMQ_HAVE_TIPC
527 528 529 530 531 532 533 534
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
             delete listener;
             event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
535
             EXIT_MUTEX();
536 537 538 539 540 541 542
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
543
        options.connected = true;
somdoron's avatar
somdoron committed
544
        EXIT_MUTEX();
545 546 547
        return 0;
    }
#endif
548

somdoron's avatar
somdoron committed
549
    EXIT_MUTEX();
550
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
551
    return -1;
552 553
}

554
int zmq::socket_base_t::connect (const char *addr_)
555
{
somdoron's avatar
somdoron committed
556 557
    ENTER_MUTEX();

558
    if (unlikely (ctx_terminated)) {
559
        errno = ETERM;
somdoron's avatar
somdoron committed
560
        EXIT_MUTEX();
561 562 563
        return -1;
    }

564 565
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
566 567
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
568
        return -1;
somdoron's avatar
somdoron committed
569
    }
570

malosek's avatar
malosek committed
571
    //  Parse addr_ string.
572 573
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
574 575
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
576
        return -1;
somdoron's avatar
somdoron committed
577
    }
malosek's avatar
malosek committed
578

Pieter Hintjens's avatar
Pieter Hintjens committed
579
    if (protocol == "inproc") {
580

581 582 583 584
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

585 586
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
587

588
        // The total HWM for an inproc connection should be the sum of
589
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
590
        int sndhwm = 0;
591 592 593
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
594
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
595
        int rcvhwm = 0;
596 597
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
598 599
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
600
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
601

602
        //  Create a bi-directional pipe to connect the peers.
603
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
604
        pipe_t *new_pipes [2] = {NULL, NULL};
605 606 607 608 609 610 611 612 613 614

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
615
        int rc = pipepair (parents, new_pipes, hwms, conflates);
616 617 618 619 620
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

621
        errno_assert (rc == 0);
622

623 624 625 626 627 628 629 630 631 632 633 634 635 636
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
637 638
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
639
        }
Martin Hurton's avatar
Martin Hurton committed
640
        else {
641 642 643 644 645 646 647 648 649 650 651
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
652

653 654 655 656 657 658 659 660 661 662 663
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
664

665 666 667 668 669
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
670

Martin Hurton's avatar
Martin Hurton committed
671 672 673
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

674
        // Save last endpoint URI
675
        last_endpoint.assign (addr_);
676

677
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
678
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
679

680
        options.connected = true;
somdoron's avatar
somdoron committed
681
        EXIT_MUTEX();
682 683
        return 0;
    }
684 685 686 687
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
688
        const endpoints_t::iterator it = endpoints.find (addr_);
689 690 691 692
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
somdoron's avatar
somdoron committed
693
            EXIT_MUTEX();
694 695 696
            return 0;
        }
    }
697

698 699 700 701
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
702
        EXIT_MUTEX();
703 704 705
        return -1;
    }

706
    address_t *paddr = new (std::nothrow) address_t (protocol, address);
707
    alloc_assert (paddr);
708 709 710

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
711 712 713 714 715 716 717 718 719
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
720
        if (isalnum (*check) || isxdigit (*check) || *check == '[') {
721 722 723
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
724 725
                || *check == '.' || *check == '-' || *check == ':'|| *check == ';'
                || *check == ']')
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
                check++;
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
741
            errno = EINVAL;
742
            delete paddr;
somdoron's avatar
somdoron committed
743
            EXIT_MUTEX();
744 745 746
            return -1;
        }
        //  Defer resolution until a socket is opened
747
        paddr->resolved.tcp_addr = NULL;
748
    }
749
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
750 751
    else
    if (protocol == "ipc") {
752
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
753
        alloc_assert (paddr->resolved.ipc_addr);
754 755 756
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
            delete paddr;
somdoron's avatar
somdoron committed
757
            EXIT_MUTEX();
758 759 760
            return -1;
        }
    }
761
#endif
bebopagogo's avatar
bebopagogo committed
762 763 764
    
// TBD - Should we check address for ZMQ_HAVE_NORM???
    
765 766 767 768 769 770 771
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
772 773 774 775
        if (rc != 0 || port_number == 0) {
          EXIT_MUTEX();
          return -1;
        }
776
    }
777
#endif
778
#if defined ZMQ_HAVE_TIPC
779 780 781 782 783 784 785
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
            delete paddr;
somdoron's avatar
somdoron committed
786
            EXIT_MUTEX();
787 788 789 790 791
            return -1;
        }
    }
#endif

792
    //  Create session.
793
    session_base_t *session = session_base_t::create (io_thread, true, this,
794
        options, paddr);
795
    errno_assert (session);
796

797
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
798 799
    //  sent to this pipe. (same for NORM, currently?)
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm";
800
    pipe_t *newpipe = NULL;
801

802
    if (options.immediate != 1 || subscribe_to_all) {
803 804
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
805
        pipe_t *new_pipes [2] = {NULL, NULL};
806 807 808 809 810 811 812 813 814 815 816

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
817
        rc = pipepair (parents, new_pipes, hwms, conflates);
818
        errno_assert (rc == 0);
819

820
        //  Attach local end of the pipe to the socket object.
821
        attach_pipe (new_pipes [0], subscribe_to_all);
822
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
823

824
        //  Attach remote end of the pipe to the session object later on.
825
        session->attach_pipe (new_pipes [1]);
826 827 828
    }

    //  Save last endpoint URI
829
    paddr->to_string (last_endpoint);
830

831
    add_endpoint (addr_, (own_t *) session, newpipe);
somdoron's avatar
somdoron committed
832
    EXIT_MUTEX();
833 834 835
    return 0;
}

836
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
837
{
838
    //  Activate the session. Make it a child of this socket.
839
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
840
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
841 842 843 844
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
somdoron's avatar
somdoron committed
845 846
    ENTER_MUTEX();

847 848 849
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
somdoron's avatar
somdoron committed
850
        EXIT_MUTEX();
851 852
        return -1;
    }
malosek's avatar
malosek committed
853

854
    //  Check whether endpoint address passed to the function is valid.
855 856
    if (unlikely (!addr_)) {
        errno = EINVAL;
somdoron's avatar
somdoron committed
857
        EXIT_MUTEX();
858 859 860
        return -1;
    }

861 862 863
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
864 865
    if (unlikely(rc != 0)) {
        EXIT_MUTEX();
866
        return -1;
somdoron's avatar
somdoron committed
867
    }
868

869 870 871
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
872 873
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
        EXIT_MUTEX();
874
        return -1;
somdoron's avatar
somdoron committed
875
    }
876 877 878

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
879 880
        if (unregister_endpoint (std::string(addr_), this) == 0) {
            EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
881
            return 0;
somdoron's avatar
somdoron committed
882
        }
883 884 885
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
somdoron's avatar
somdoron committed
886
            EXIT_MUTEX();
887 888
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
889

890
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
891
            it->second->terminate (true);
892
        inprocs.erase (range.first, range.second);
somdoron's avatar
somdoron committed
893
        EXIT_MUTEX();
894 895 896
        return 0;
    }

897 898
    //  Find the endpoints range (if any) corresponding to the addr_ string.
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
899 900
    if (range.first == range.second) {
        errno = ENOENT;
somdoron's avatar
somdoron committed
901
        EXIT_MUTEX();
902
        return -1;
903
    }
904

905 906 907
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
908
            it->second.second->terminate (false);
909
        term_child (it->second.first);
910
    }
911
    endpoints.erase (range.first, range.second);
somdoron's avatar
somdoron committed
912
    EXIT_MUTEX();
913
    return 0;
914 915
}

916
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
917
{
somdoron's avatar
somdoron committed
918 919
    ENTER_MUTEX();

920
    //  Check whether the library haven't been shut down yet.
921
    if (unlikely (ctx_terminated)) {
922
        errno = ETERM;
somdoron's avatar
somdoron committed
923
        EXIT_MUTEX();
924 925 926
        return -1;
    }

927
    //  Check whether message passed to the function is valid.
928
    if (unlikely (!msg_ || !msg_->check ())) {
929
        errno = EFAULT;
somdoron's avatar
somdoron committed
930
        EXIT_MUTEX();
931 932 933
        return -1;
    }

934
    //  Process pending commands, if any.
935
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
936 937
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
938
        return -1;
somdoron's avatar
somdoron committed
939
    }
940

941 942 943
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

944
    //  At this point we impose the flags on the message.
945
    if (flags_ & ZMQ_SNDMORE)
946
        msg_->set_flags (msg_t::more);
947

948 949
    msg_->reset_metadata ();

Martin Sustrik's avatar
Martin Sustrik committed
950
    //  Try to send the message.
951
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
952 953
    if (rc == 0) {
        EXIT_MUTEX();
954
        return 0;
somdoron's avatar
somdoron committed
955 956 957
    }
    if (unlikely (errno != EAGAIN)) {
        EXIT_MUTEX();
958
        return -1;
somdoron's avatar
somdoron committed
959
    }
Martin Sustrik's avatar
Martin Sustrik committed
960

961
    //  In case of non-blocking send we'll simply propagate
962
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
963 964
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
        EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
965
        return -1;
somdoron's avatar
somdoron committed
966
    }
Martin Sustrik's avatar
Martin Sustrik committed
967

968
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
969
    //  If the timeout is infinite, don't care.
970 971 972
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

973 974
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
975 976
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
977 978
        if (unlikely (process_commands (timeout, false) != 0)) {
            EXIT_MUTEX();
979
            return -1;
somdoron's avatar
somdoron committed
980
        }        
981
        rc = xsend (msg_);
982 983
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
984 985
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
986
            return -1;
somdoron's avatar
somdoron committed
987
        }
988
        if (timeout > 0) {
989
            timeout = (int) (end - clock.now_ms ());
990 991
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
992
                EXIT_MUTEX();
993 994 995
                return -1;
            }
        }
996
    }
somdoron's avatar
somdoron committed
997 998

    EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
999
    return 0;
1000 1001
}

1002
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1003
{
somdoron's avatar
somdoron committed
1004 1005
    ENTER_MUTEX();

1006
    //  Check whether the library haven't been shut down yet.
1007
    if (unlikely (ctx_terminated)) {
1008
        errno = ETERM;
somdoron's avatar
somdoron committed
1009
        EXIT_MUTEX();
1010 1011 1012
        return -1;
    }

1013
    //  Check whether message passed to the function is valid.
1014
    if (unlikely (!msg_ || !msg_->check ())) {
1015
        errno = EFAULT;
somdoron's avatar
somdoron committed
1016
        EXIT_MUTEX();
1017 1018 1019
        return -1;
    }

1020 1021 1022 1023 1024 1025 1026
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1027
    //  ticks is more efficient than doing RDTSC all the time.
1028
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1029 1030
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1031
            return -1;
somdoron's avatar
somdoron committed
1032
        }
1033 1034 1035
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1036
    //  Get the message.
1037
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1038 1039
    if (unlikely (rc != 0 && errno != EAGAIN)) {
        EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
1040
        return -1;
somdoron's avatar
somdoron committed
1041
    }
Martin Hurton's avatar
Martin Hurton committed
1042

1043
    //  If we have the message, return immediately.
1044
    if (rc == 0) {
1045
        if (file_desc != retired_fd)
1046
            msg_->set_fd(file_desc);
1047
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1048
        EXIT_MUTEX();
1049
        return 0;
1050
    }
1051

Martin Sustrik's avatar
Martin Sustrik committed
1052
    //  If the message cannot be fetched immediately, there are two scenarios.
1053 1054 1055
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
1056
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1057 1058
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1059
            return -1;
somdoron's avatar
somdoron committed
1060
        }
1061
        ticks = 0;
1062

1063
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1064 1065
        if (rc < 0) {
            EXIT_MUTEX();
1066
            return rc;
somdoron's avatar
somdoron committed
1067
        }
1068
        if (file_desc != retired_fd)
1069
            msg_->set_fd(file_desc);
1070
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1071 1072

        EXIT_MUTEX();
1073
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1074 1075
    }

1076
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1077
    //  If the timeout is infinite, don't care.
1078 1079 1080
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1081 1082
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1083
    bool block = (ticks != 0);
1084
    while (true) {
somdoron's avatar
somdoron committed
1085 1086
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
            EXIT_MUTEX();
1087
            return -1;
somdoron's avatar
somdoron committed
1088
        }
1089
        rc = xrecv (msg_);
1090 1091 1092 1093
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1094 1095
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
1096
            return -1;
somdoron's avatar
somdoron committed
1097
        }
1098
        block = true;
1099
        if (timeout > 0) {
1100
            timeout = (int) (end - clock.now_ms ());
1101 1102
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
1103
                EXIT_MUTEX();
1104 1105 1106
                return -1;
            }
        }
1107
    }
1108

1109
    if (file_desc != retired_fd)
1110
        msg_->set_fd(file_desc);
1111
    extract_flags (msg_);
somdoron's avatar
somdoron committed
1112
    EXIT_MUTEX();
1113
    return 0;
1114 1115 1116 1117
}

int zmq::socket_base_t::close ()
{
1118 1119 1120
    //  Mark the socket as dead
    tag = 0xdeadbeef;
    
1121 1122 1123 1124
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1125

1126 1127 1128
    return 0;
}

1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1139
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1140
{
1141
    //  Plug the socket to the reaper thread.
1142
    poller = poller_;
somdoron's avatar
somdoron committed
1143 1144 1145 1146 1147 1148 1149 1150

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
    else {        
        ENTER_MUTEX();

1151 1152
        reaper_signaler =  new signaler_t();

somdoron's avatar
somdoron committed
1153
        //  Add signaler to the safe mailbox
1154 1155
        fd = reaper_signaler->get_fd();        
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1156 1157

        //  Send a signal to make sure reaper handle existing commands
1158
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1159 1160 1161 1162 1163

        EXIT_MUTEX();
    }

    handle = poller->add_fd (fd, this);
1164
    poller->set_pollin (handle);
1165 1166 1167 1168 1169

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1170 1171
}

1172
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1173
{
1174
    int rc;
1175
    command_t cmd;
1176 1177 1178
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1179
        rc = mailbox->recv (&cmd, timeout_);
1180 1181
    }
    else {
1182

1183 1184 1185
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1186
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1187
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1188

1189 1190 1191 1192 1193 1194
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1195 1196
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1197 1198 1199
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1200
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1201
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1202
            last_tsc = tsc;
1203 1204 1205
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1206
        rc = mailbox->recv (&cmd, 0);
1207
    }
Martin Sustrik's avatar
Martin Sustrik committed
1208

1209 1210
    //  Process all available commands.
    while (rc == 0) {
1211
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1212
        rc = mailbox->recv (&cmd, 0);
1213 1214 1215 1216 1217 1218
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1219

1220
    if (ctx_terminated) {
1221 1222
        errno = ETERM;
        return -1;
1223
    }
1224 1225

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1226 1227
}

1228
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1229
{
1230
    //  Here, someone have called zmq_term while the socket was still alive.
1231
    //  We'll remember the fact so that any blocking call is interrupted and any
1232 1233
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1234
    stop_monitor ();
1235
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1236 1237
}

1238
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1239
{
1240
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1241 1242
}

1243
void zmq::socket_base_t::process_term (int linger_)
1244
{
1245 1246 1247 1248 1249
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1250 1251
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1252
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1253
    register_term_acks ((int) pipes.size ());
1254

1255
    //  Continue the termination process immediately.
1256
    own_t::process_term (linger_);
1257 1258
}

1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
void zmq::socket_base_t::update_pipe_options(int option_)
{
	if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) 
	{
		for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
		{
			pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
		}
	}

}

1271 1272 1273 1274 1275
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1276
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1277 1278 1279 1280
{
    errno = EINVAL;
    return -1;
}
1281 1282 1283 1284 1285 1286

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1287
int zmq::socket_base_t::xsend (msg_t *)
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

1298
int zmq::socket_base_t::xrecv (msg_t *)
1299 1300 1301 1302 1303
{
    errno = ENOTSUP;
    return -1;
}

1304 1305 1306 1307 1308
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1309
void zmq::socket_base_t::xread_activated (pipe_t *)
1310 1311 1312
{
    zmq_assert (false);
}
1313
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1314 1315 1316 1317
{
    zmq_assert (false);
}

1318
void zmq::socket_base_t::xhiccuped (pipe_t *)
1319
{
1320
    zmq_assert (false);
1321 1322
}

1323 1324
void zmq::socket_base_t::in_event ()
{
1325 1326 1327 1328
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
somdoron's avatar
somdoron committed
1329 1330

    ENTER_MUTEX();
1331 1332 1333 1334 1335
    
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
    
1336
    process_commands (0, false);
somdoron's avatar
somdoron committed
1337
    EXIT_MUTEX();
1338 1339 1340 1341 1342 1343 1344 1345
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1346
void zmq::socket_base_t::timer_event (int)
1347 1348 1349
{
    zmq_assert (false);
}
1350

1351 1352
void zmq::socket_base_t::check_destroy ()
{
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1380 1381
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1382
    if (options.immediate == 1)
1383 1384 1385 1386
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1387 1388
}

1389
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1390 1391
{
    //  Notify the specific socket type about the pipe termination.
1392
    xpipe_terminated (pipe_);
1393

1394
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1395
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1396
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1397
            inprocs.erase (it);
1398
            break;
1399 1400
        }

1401 1402 1403 1404 1405 1406 1407
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1408 1409
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1410
    //  Test whether IDENTITY flag is valid for this socket type.
1411
    if (unlikely (msg_->flags () & msg_t::identity))
1412
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1413

1414
    //  Remove MORE flag.
1415 1416
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1417

1418
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1419
{
1420 1421 1422 1423
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1424
    //  Support deregistering monitoring endpoints as well
1425 1426 1427 1428 1429 1430 1431
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1432
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1433 1434
        return -1;

1435
    //  Event notification only supported over inproc://
1436 1437 1438 1439
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1440
    //  Register events to monitor
1441
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1442
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1443 1444 1445
    if (monitor_socket == NULL)
        return -1;

1446
    //  Never block context termination on pending event messages
1447
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1448
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1449
    if (rc == -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
1450
        stop_monitor ();
1451

1452
    //  Spawn the monitor socket endpoint
1453 1454 1455 1456 1457 1458
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
         stop_monitor ();
    return rc;
}

1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
    file_desc = fd_;
}

zmq::fd_t zmq::socket_base_t::fd()
{
    return file_desc;
}

Martin Hurton's avatar
Martin Hurton committed
1469
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1470
{
1471 1472
    if (monitor_events & ZMQ_EVENT_CONNECTED)
        monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1473
}
1474

Martin Hurton's avatar
Martin Hurton committed
1475
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1476
{
1477 1478
    if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
        monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1479
}
1480

Martin Hurton's avatar
Martin Hurton committed
1481
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1482
{
1483 1484
    if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
        monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1485 1486
}

Martin Hurton's avatar
Martin Hurton committed
1487
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1488
{
1489 1490
    if (monitor_events & ZMQ_EVENT_LISTENING)
        monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1491 1492
}

Martin Hurton's avatar
Martin Hurton committed
1493
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1494
{
1495 1496
    if (monitor_events & ZMQ_EVENT_BIND_FAILED)
        monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1497 1498
}

Martin Hurton's avatar
Martin Hurton committed
1499
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1500
{
1501 1502
    if (monitor_events & ZMQ_EVENT_ACCEPTED)
        monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1503 1504
}

Martin Hurton's avatar
Martin Hurton committed
1505
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1506
{
1507 1508
    if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
        monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1509 1510
}

Martin Hurton's avatar
Martin Hurton committed
1511
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1512
{
1513 1514
    if (monitor_events & ZMQ_EVENT_CLOSED)
        monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1515
}
Martin Hurton's avatar
Martin Hurton committed
1516 1517

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1518
{
1519 1520
    if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
        monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1521 1522
}

Martin Hurton's avatar
Martin Hurton committed
1523
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1524
{
1525 1526
    if (monitor_events & ZMQ_EVENT_DISCONNECTED)
        monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1527 1528
}

1529 1530
//  Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1531
{
1532
    if (monitor_socket) {
1533
        //  Send event in first frame
1534
        zmq_msg_t msg;
1535
        zmq_msg_init_size (&msg, 6);
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548
#ifdef ZMQ_HAVE_HPUX
        // avoid SIGBUS
        union {
          uint8_t data[6];
          struct {
            uint16_t event;
            uint32_t value;
          } v;
        } u;
        u.v.event = event_;
        u.v.value = value_;
        memcpy(zmq_msg_data (&msg), u.data, 6);
#else
1549 1550 1551
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        *(uint16_t *) (data + 0) = (uint16_t) event_;
        *(uint32_t *) (data + 2) = (uint32_t) value_;
1552
#endif
1553
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1554 1555

        //  Send address in second frame
1556
        zmq_msg_init_size (&msg, addr_.size());
1557
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1558 1559
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1560 1561
}

1562
void zmq::socket_base_t::stop_monitor (void)
1563 1564
{
    if (monitor_socket) {
1565 1566
        if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1567 1568 1569 1570
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1571
}