socket_base.cpp 44.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <string>
32 33
#include <algorithm>

34 35 36 37 38
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
39
#if defined _WIN32_WCE
boris@boressoft.ru's avatar
boris@boressoft.ru committed
40 41
#include <cmnintrin.h>
#else
42 43
#include <intrin.h>
#endif
boris@boressoft.ru's avatar
boris@boressoft.ru committed
44
#endif
45 46 47
#else
#include <unistd.h>
#endif
48

49
#include "socket_base.hpp"
50
#include "tcp_listener.hpp"
51
#include "ipc_listener.hpp"
52
#include "tipc_listener.hpp"
53
#include "tcp_connecter.hpp"
54
#include "io_thread.hpp"
55
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
56
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
57
#include "pipe.hpp"
58
#include "err.hpp"
59
#include "ctx.hpp"
malosek's avatar
malosek committed
60
#include "platform.hpp"
61
#include "likely.hpp"
62
#include "msg.hpp"
63 64 65
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
66
#include "tipc_address.hpp"
somdoron's avatar
somdoron committed
67 68
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
69 70 71
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
72

73 74 75 76 77 78 79
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
80 81
#include "dealer.hpp"
#include "router.hpp"
82 83
#include "xpub.hpp"
#include "xsub.hpp"
84
#include "stream.hpp"
85
#include "server.hpp"
86
#include "client.hpp"
87

somdoron's avatar
somdoron committed
88 89 90 91 92 93 94 95
#define ENTER_MUTEX() \
    if (thread_safe) \
        sync.lock();

#define EXIT_MUTEX() \
    if (thread_safe) \
        sync.unlock();

96 97 98 99 100
bool zmq::socket_base_t::check_tag ()
{
    return tag == 0xbaddecaf;
}

101
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
102
    uint32_t tid_, int sid_)
103 104 105
{
    socket_base_t *s = NULL;
    switch (type_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
        case ZMQ_DEALER:
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
            break;
        case ZMQ_ROUTER:
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
            break;
        case ZMQ_PULL:
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUSH:
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
            break;
        case ZMQ_XPUB:
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
            break;
        case ZMQ_XSUB:
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
            break;
        case ZMQ_STREAM:
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
            break;
142 143 144
        case ZMQ_SERVER:
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
            break;
145 146 147
        case ZMQ_CLIENT:
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
148 149 150
        default:
            errno = EINVAL;
            return NULL;
151
    }
152 153

    alloc_assert (s);
somdoron's avatar
somdoron committed
154 155 156

    mailbox_t *mailbox = dynamic_cast<mailbox_t*> (s->mailbox);

157 158 159
    if (mailbox != NULL && mailbox->get_fd () == retired_fd) {
        s->destroyed = true;
        delete s;
Pieter Hintjens's avatar
Pieter Hintjens committed
160
        return NULL;
161
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
162

163 164 165
    return s;
}

somdoron's avatar
somdoron committed
166
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
Martin Sustrik's avatar
Martin Sustrik committed
167
    own_t (parent_, tid_),
168
    tag (0xbaddecaf),
169
    ctx_terminated (false),
170
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
171
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
172
    ticks (0),
173
    rcvmore (false),
174
    file_desc(-1),
175
    monitor_socket (NULL),
somdoron's avatar
somdoron committed
176
    monitor_events (0),
177 178
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
179
{
180
    options.socket_id = sid_;
181
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
182
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
somdoron's avatar
somdoron committed
183 184 185 186 187

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else
        mailbox = new mailbox_t();
188 189 190 191
}

zmq::socket_base_t::~socket_base_t ()
{
somdoron's avatar
somdoron committed
192
    delete mailbox;
193 194 195 196
    
    if (reaper_signaler)
        delete reaper_signaler;
    
197
    stop_monitor ();
198
    zmq_assert (destroyed);
199 200
}

somdoron's avatar
somdoron committed
201
zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
202
{
somdoron's avatar
somdoron committed
203
    return mailbox;
204 205 206 207 208 209 210 211 212 213 214
}

void zmq::socket_base_t::stop ()
{
    //  Called by ctx when it is terminated (zmq_term).
    //  'stop' command is sent from the threads that called zmq_term to
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

215 216 217 218 219 220 221 222 223 224 225 226 227
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
Martin Hurton's avatar
Martin Hurton committed
228

229 230 231 232 233 234 235
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

236 237 238
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
    //  First check out whether the protcol is something we are aware of.
Pieter Hintjens's avatar
Pieter Hintjens committed
239 240 241 242 243 244 245
    if (protocol_ != "inproc"
    &&  protocol_ != "ipc"
    &&  protocol_ != "tcp"
    &&  protocol_ != "pgm"
    &&  protocol_ != "epgm"
    &&  protocol_ != "tipc"
    &&  protocol_ != "norm") {
246 247 248 249 250 251 252 253 254 255 256
        errno = EPROTONOSUPPORT;
        return -1;
    }
    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
    //  are not avaialble.
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
257

bebopagogo's avatar
bebopagogo committed
258 259 260 261 262 263
#if !defined ZMQ_HAVE_NORM
    if (protocol_ == "norm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif // !ZMQ_HAVE_NORM
264 265 266

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
267
    if (protocol_ == "ipc") {
268 269 270 271 272 273
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

274
    // TIPC transport is only available on Linux.
275
#if !defined ZMQ_HAVE_TIPC
Erik Hugne's avatar
Erik Hugne committed
276
    if (protocol_ == "tipc") {
277 278 279 280 281
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

282 283 284
    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
bebopagogo's avatar
bebopagogo committed
285
    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
286 287
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
288 289 290 291 292 293 294 295
        errno = ENOCOMPATPROTO;
        return -1;
    }

    //  Protocol is available.
    return 0;
}

296
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
297
{
298 299 300 301
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this);
    pipes.push_back (pipe_);
    
302
    //  Let the derived socket type know about new pipe.
303
    xattach_pipe (pipe_, subscribe_to_all_);
304 305 306 307 308

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
309
        pipe_->terminate (false);
310
    }
311 312
}

313
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
314
    size_t optvallen_)
315
{
somdoron's avatar
somdoron committed
316 317
    ENTER_MUTEX();

318 319 320 321 322 323 324
	if (!options.is_valid(option_)) {
		errno = EINVAL;
		EXIT_MUTEX();
		return -1;
	}


325
    if (unlikely (ctx_terminated)) {
326
        errno = ETERM;
somdoron's avatar
somdoron committed
327
        EXIT_MUTEX();
328 329 330
        return -1;
    }

331 332
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
somdoron's avatar
somdoron committed
333 334
    if (rc == 0 || errno != EINVAL) {
        EXIT_MUTEX();
335
        return rc;
somdoron's avatar
somdoron committed
336
    }
337 338 339

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
somdoron's avatar
somdoron committed
340
    rc = options.setsockopt (option_, optval_, optvallen_);
341
	update_pipe_options(option_);
somdoron's avatar
somdoron committed
342 343 344

    EXIT_MUTEX();
    return rc;
345 346
}

347 348 349
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
somdoron's avatar
somdoron committed
350 351
    ENTER_MUTEX();

352
    if (unlikely (ctx_terminated)) {
353
        errno = ETERM;
somdoron's avatar
somdoron committed
354
        EXIT_MUTEX();
355 356 357
        return -1;
    }

358
    if (option_ == ZMQ_RCVMORE) {
359
        if (*optvallen_ < sizeof (int)) {
360
            errno = EINVAL;
somdoron's avatar
somdoron committed
361
            EXIT_MUTEX();
362 363
            return -1;
        }
364 365
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
366
        EXIT_MUTEX();
367 368 369
        return 0;
    }

370 371 372
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
373
            EXIT_MUTEX();
374 375
            return -1;
        }
somdoron's avatar
somdoron committed
376 377 378 379 380 381 382 383 384 385 386 387

        if (thread_safe) {
            // thread safe socket doesn't provide file descriptor
            errno = EINVAL;
            EXIT_MUTEX();
            return -1;
        }
        
        *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
        *optvallen_ = sizeof(fd_t);                      
        
        EXIT_MUTEX();
388 389 390 391
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
392
        if (*optvallen_ < sizeof (int)) {
393
            errno = EINVAL;
somdoron's avatar
somdoron committed
394
            EXIT_MUTEX();
395 396
            return -1;
        }
397
        int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
398 399
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
            EXIT_MUTEX();
400
            return -1;
somdoron's avatar
somdoron committed
401
        }
402
        errno_assert (rc == 0);
403
        *((int*) optval_) = 0;
404
        if (has_out ())
405
            *((int*) optval_) |= ZMQ_POLLOUT;
406
        if (has_in ())
407 408
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
somdoron's avatar
somdoron committed
409
        EXIT_MUTEX();
410 411 412
        return 0;
    }

413 414 415
    if (option_ == ZMQ_LAST_ENDPOINT) {
        if (*optvallen_ < last_endpoint.size () + 1) {
            errno = EINVAL;
somdoron's avatar
somdoron committed
416
            EXIT_MUTEX();
417 418 419 420
            return -1;
        }
        strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
        *optvallen_ = last_endpoint.size () + 1;
somdoron's avatar
somdoron committed
421
        EXIT_MUTEX();
422 423 424
        return 0;
    }

somdoron's avatar
somdoron committed
425 426 427
    int rc = options.getsockopt (option_, optval_, optvallen_);
    EXIT_MUTEX();
    return rc;
428 429
}

430 431
int zmq::socket_base_t::bind (const char *addr_)
{
somdoron's avatar
somdoron committed
432 433
    ENTER_MUTEX();

434
    if (unlikely (ctx_terminated)) {
435
        errno = ETERM;
somdoron's avatar
somdoron committed
436
        EXIT_MUTEX();
437 438 439
        return -1;
    }

440 441
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
442 443
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
444
        return -1;
somdoron's avatar
somdoron committed
445
    }
446

447
    //  Parse addr_ string.
448 449
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
450 451
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
452
        return -1;
somdoron's avatar
somdoron committed
453
    }
454

Pieter Hintjens's avatar
Pieter Hintjens committed
455
    if (protocol == "inproc") {
456
        const endpoint_t endpoint = { this, options };
Martin Hurton's avatar
Martin Hurton committed
457
        const int rc = register_endpoint (addr_, endpoint);
458
        if (rc == 0) {
Martin Hurton's avatar
Martin Hurton committed
459
            connect_pending (addr_, this);
460
            last_endpoint.assign (addr_);
461
            options.connected = true;
462
        }
somdoron's avatar
somdoron committed
463
        EXIT_MUTEX();
464
        return rc;
465
    }
466

bebopagogo's avatar
bebopagogo committed
467
    if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
468
        //  For convenience's sake, bind can be used interchageable with
bebopagogo's avatar
bebopagogo committed
469
        //  connect for PGM, EPGM and NORM transports.
somdoron's avatar
somdoron committed
470
        EXIT_MUTEX();
471 472 473 474
        rc = connect (addr_);
        if (rc != -1)
            options.connected = true;
        return rc;
475 476 477 478 479 480 481
    }

    //  Remaining trasnports require to be run in an I/O thread, so at this
    //  point we'll choose one.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
482
        EXIT_MUTEX();
483 484
        return -1;
    }
485

486
    if (protocol == "tcp") {
487
        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
488
            io_thread, this, options);
489
        alloc_assert (listener);
490
        int rc = listener->set_address (address.c_str ());
491 492
        if (rc != 0) {
            delete listener;
493
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
494
            EXIT_MUTEX();
495
            return -1;
496
        }
497

498
        // Save last endpoint URI
499
        listener->get_address (last_endpoint);
500

501
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
502
        options.connected = true;
somdoron's avatar
somdoron committed
503
        EXIT_MUTEX();
504 505 506
        return 0;
    }

507
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
508 509 510 511
    if (protocol == "ipc") {
        ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
            io_thread, this, options);
        alloc_assert (listener);
512
        int rc = listener->set_address (address.c_str ());
513 514
        if (rc != 0) {
            delete listener;
515
            event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
516
            EXIT_MUTEX();
517 518
            return -1;
        }
519

520
        // Save last endpoint URI
521
        listener->get_address (last_endpoint);
522

523
        add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
524
        options.connected = true;
somdoron's avatar
somdoron committed
525
        EXIT_MUTEX();
526
        return 0;
527
    }
528
#endif
529
#if defined ZMQ_HAVE_TIPC
530 531 532 533 534 535 536 537
    if (protocol == "tipc") {
         tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
              io_thread, this, options);
         alloc_assert (listener);
         int rc = listener->set_address (address.c_str ());
         if (rc != 0) {
             delete listener;
             event_bind_failed (address, zmq_errno());
somdoron's avatar
somdoron committed
538
             EXIT_MUTEX();
539 540 541 542 543 544 545
             return -1;
         }

        // Save last endpoint URI
        listener->get_address (last_endpoint);

        add_endpoint (addr_, (own_t *) listener, NULL);
546
        options.connected = true;
somdoron's avatar
somdoron committed
547
        EXIT_MUTEX();
548 549 550
        return 0;
    }
#endif
551

somdoron's avatar
somdoron committed
552
    EXIT_MUTEX();
553
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
554
    return -1;
555 556
}

557
int zmq::socket_base_t::connect (const char *addr_)
558
{
somdoron's avatar
somdoron committed
559 560
    ENTER_MUTEX();

561
    if (unlikely (ctx_terminated)) {
562
        errno = ETERM;
somdoron's avatar
somdoron committed
563
        EXIT_MUTEX();
564 565 566
        return -1;
    }

567 568
    //  Process pending commands, if any.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
569 570
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
571
        return -1;
somdoron's avatar
somdoron committed
572
    }
573

malosek's avatar
malosek committed
574
    //  Parse addr_ string.
575 576
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
577 578
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX();
579
        return -1;
somdoron's avatar
somdoron committed
580
    }
malosek's avatar
malosek committed
581

Pieter Hintjens's avatar
Pieter Hintjens committed
582
    if (protocol == "inproc") {
583

584 585 586 587
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

588 589
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
590

591
        // The total HWM for an inproc connection should be the sum of
592
        // the binder's HWM and the connector's HWM.
Martin Hurton's avatar
Martin Hurton committed
593
        int sndhwm = 0;
594 595 596
        if (peer.socket == NULL)
            sndhwm = options.sndhwm;
        else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
597
            sndhwm = options.sndhwm + peer.options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
598
        int rcvhwm = 0;
599 600
        if (peer.socket == NULL)
            rcvhwm = options.rcvhwm;
Martin Hurton's avatar
Martin Hurton committed
601 602
        else
        if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
603
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
604

605
        //  Create a bi-directional pipe to connect the peers.
606
        object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
607
        pipe_t *new_pipes [2] = {NULL, NULL};
608 609 610 611 612 613 614 615 616 617

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
618
        int rc = pipepair (parents, new_pipes, hwms, conflates);
619 620 621 622 623
        if (!conflate) {
            new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
            new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
        }

624
        errno_assert (rc == 0);
625

626 627 628 629 630 631 632 633 634 635 636 637 638 639
        if (!peer.socket) {
            //  The peer doesn't exist yet so we don't know whether
            //  to send the identity message or not. To resolve this,
            //  we always send our identity and drop it later if
            //  the peer doesn't expect it.
            msg_t id;
            rc = id.init_size (options.identity_size);
            errno_assert (rc == 0);
            memcpy (id.data (), options.identity, options.identity_size);
            id.set_flags (msg_t::identity);
            bool written = new_pipes [0]->write (&id);
            zmq_assert (written);
            new_pipes [0]->flush ();

Martin Hurton's avatar
Martin Hurton committed
640 641
            const endpoint_t endpoint = {this, options};
            pend_connection (std::string (addr_), endpoint, new_pipes);
642
        }
Martin Hurton's avatar
Martin Hurton committed
643
        else {
644 645 646 647 648 649 650 651 652 653 654
            //  If required, send the identity of the local socket to the peer.
            if (peer.options.recv_identity) {
                msg_t id;
                rc = id.init_size (options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), options.identity, options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [0]->write (&id);
                zmq_assert (written);
                new_pipes [0]->flush ();
            }
655

656 657 658 659 660 661 662 663 664 665 666
            //  If required, send the identity of the peer to the local socket.
            if (options.recv_identity) {
                msg_t id;
                rc = id.init_size (peer.options.identity_size);
                errno_assert (rc == 0);
                memcpy (id.data (), peer.options.identity, peer.options.identity_size);
                id.set_flags (msg_t::identity);
                bool written = new_pipes [1]->write (&id);
                zmq_assert (written);
                new_pipes [1]->flush ();
            }
667

668 669 670 671 672
            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, new_pipes [1], false);
        }
673

Martin Hurton's avatar
Martin Hurton committed
674 675 676
        //  Attach local end of the pipe to this socket object.
        attach_pipe (new_pipes [0]);

677
        // Save last endpoint URI
678
        last_endpoint.assign (addr_);
679

680
        // remember inproc connections for disconnect
Martin Hurton's avatar
Martin Hurton committed
681
        inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
682

683
        options.connected = true;
somdoron's avatar
somdoron committed
684
        EXIT_MUTEX();
685 686
        return 0;
    }
687 688 689 690
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ);
    if (unlikely (is_single_connect)) {
Martin Hurton's avatar
Martin Hurton committed
691
        const endpoints_t::iterator it = endpoints.find (addr_);
692 693 694 695
        if (it != endpoints.end ()) {
            // There is no valid use for multiple connects for SUB-PUB nor
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
            // nonsensical results.
somdoron's avatar
somdoron committed
696
            EXIT_MUTEX();
697 698 699
            return 0;
        }
    }
700

701 702 703 704
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
somdoron's avatar
somdoron committed
705
        EXIT_MUTEX();
706 707 708
        return -1;
    }

709
    address_t *paddr = new (std::nothrow) address_t (protocol, address);
710
    alloc_assert (paddr);
711 712 713

    //  Resolve address (if needed by the protocol)
    if (protocol == "tcp") {
714 715 716 717 718 719 720 721 722
        //  Do some basic sanity checks on tcp:// address syntax
        //  - hostname starts with digit or letter, with embedded '-' or '.'
        //  - IPv6 address may contain hex chars and colons.
        //  - IPv4 address may contain decimal digits and dots.
        //  - Address must end in ":port" where port is *, or numeric
        //  - Address may contain two parts separated by ':'
        //  Following code is quick and dirty check to catch obvious errors,
        //  without trying to be fully accurate.
        const char *check = address.c_str ();
723
        if (isalnum (*check) || isxdigit (*check) || *check == '[') {
724 725 726
            check++;
            while (isalnum  (*check)
                || isxdigit (*check)
727 728
                || *check == '.' || *check == '-' || *check == ':'|| *check == ';'
                || *check == ']')
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
                check++;
        }
        //  Assume the worst, now look for success
        rc = -1;
        //  Did we reach the end of the address safely?
        if (*check == 0) {
            //  Do we have a valid port string? (cannot be '*' in connect
            check = strrchr (address.c_str (), ':');
            if (check) {
                check++;
                if (*check && (isdigit (*check)))
                    rc = 0;     //  Valid
            }
        }
        if (rc == -1) {
744
            errno = EINVAL;
745
            delete paddr;
somdoron's avatar
somdoron committed
746
            EXIT_MUTEX();
747 748 749
            return -1;
        }
        //  Defer resolution until a socket is opened
750
        paddr->resolved.tcp_addr = NULL;
751
    }
752
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
753 754
    else
    if (protocol == "ipc") {
755
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
756
        alloc_assert (paddr->resolved.ipc_addr);
757 758 759
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
        if (rc != 0) {
            delete paddr;
somdoron's avatar
somdoron committed
760
            EXIT_MUTEX();
761 762 763
            return -1;
        }
    }
764
#endif
bebopagogo's avatar
bebopagogo committed
765 766 767
    
// TBD - Should we check address for ZMQ_HAVE_NORM???
    
768 769 770 771 772 773 774
#ifdef ZMQ_HAVE_OPENPGM
    if (protocol == "pgm" || protocol == "epgm") {
        struct pgm_addrinfo_t *res = NULL;
        uint16_t port_number = 0;
        int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
        if (res != NULL)
            pgm_freeaddrinfo (res);
775 776 777 778
        if (rc != 0 || port_number == 0) {
          EXIT_MUTEX();
          return -1;
        }
779
    }
780
#endif
781
#if defined ZMQ_HAVE_TIPC
782 783 784 785 786 787 788
    else
    if (protocol == "tipc") {
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
        alloc_assert (paddr->resolved.tipc_addr);
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
        if (rc != 0) {
            delete paddr;
somdoron's avatar
somdoron committed
789
            EXIT_MUTEX();
790 791 792 793 794
            return -1;
        }
    }
#endif

795
    //  Create session.
796
    session_base_t *session = session_base_t::create (io_thread, true, this,
797
        options, paddr);
798
    errno_assert (session);
799

800
    //  PGM does not support subscription forwarding; ask for all data to be
bebopagogo's avatar
bebopagogo committed
801 802
    //  sent to this pipe. (same for NORM, currently?)
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm";
803
    pipe_t *newpipe = NULL;
804

805
    if (options.immediate != 1 || subscribe_to_all) {
806 807
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
808
        pipe_t *new_pipes [2] = {NULL, NULL};
809 810 811 812 813 814 815 816 817 818 819

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
820
        rc = pipepair (parents, new_pipes, hwms, conflates);
821
        errno_assert (rc == 0);
822

823
        //  Attach local end of the pipe to the socket object.
824
        attach_pipe (new_pipes [0], subscribe_to_all);
825
        newpipe = new_pipes [0];
Martin Sustrik's avatar
Martin Sustrik committed
826

827
        //  Attach remote end of the pipe to the session object later on.
828
        session->attach_pipe (new_pipes [1]);
829 830 831
    }

    //  Save last endpoint URI
832
    paddr->to_string (last_endpoint);
833

834
    add_endpoint (addr_, (own_t *) session, newpipe);
somdoron's avatar
somdoron committed
835
    EXIT_MUTEX();
836 837 838
    return 0;
}

839
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
840
{
841
    //  Activate the session. Make it a child of this socket.
842
    launch_child (endpoint_);
Martin Hurton's avatar
Martin Hurton committed
843
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
844 845 846 847
}

int zmq::socket_base_t::term_endpoint (const char *addr_)
{
somdoron's avatar
somdoron committed
848 849
    ENTER_MUTEX();

850 851 852
    //  Check whether the library haven't been shut down yet.
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
somdoron's avatar
somdoron committed
853
        EXIT_MUTEX();
854 855
        return -1;
    }
malosek's avatar
malosek committed
856

857
    //  Check whether endpoint address passed to the function is valid.
858 859
    if (unlikely (!addr_)) {
        errno = EINVAL;
somdoron's avatar
somdoron committed
860
        EXIT_MUTEX();
861 862 863
        return -1;
    }

864 865 866
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    //  (from launch_child() for example) we're asked to terminate now.
    int rc = process_commands (0, false);
somdoron's avatar
somdoron committed
867 868
    if (unlikely(rc != 0)) {
        EXIT_MUTEX();
869
        return -1;
somdoron's avatar
somdoron committed
870
    }
871

872 873 874
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
somdoron's avatar
somdoron committed
875 876
    if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
        EXIT_MUTEX();
877
        return -1;
somdoron's avatar
somdoron committed
878
    }
879 880 881

    // Disconnect an inproc socket
    if (protocol == "inproc") {
somdoron's avatar
somdoron committed
882 883
        if (unregister_endpoint (std::string(addr_), this) == 0) {
            EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
884
            return 0;
somdoron's avatar
somdoron committed
885
        }
886 887 888
        std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
        if (range.first == range.second) {
            errno = ENOENT;
somdoron's avatar
somdoron committed
889
            EXIT_MUTEX();
890 891
            return -1;
        }
Martin Hurton's avatar
Martin Hurton committed
892

893
        for (inprocs_t::iterator it = range.first; it != range.second; ++it)
Martin Hurton's avatar
Martin Hurton committed
894
            it->second->terminate (true);
895
        inprocs.erase (range.first, range.second);
somdoron's avatar
somdoron committed
896
        EXIT_MUTEX();
897 898 899
        return 0;
    }

900 901
    //  Find the endpoints range (if any) corresponding to the addr_ string.
    std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
902 903
    if (range.first == range.second) {
        errno = ENOENT;
somdoron's avatar
somdoron committed
904
        EXIT_MUTEX();
905
        return -1;
906
    }
907

908 909 910
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
        //  If we have an associated pipe, terminate it.
        if (it->second.second != NULL)
Martin Hurton's avatar
Martin Hurton committed
911
            it->second.second->terminate (false);
912
        term_child (it->second.first);
913
    }
914
    endpoints.erase (range.first, range.second);
somdoron's avatar
somdoron committed
915
    EXIT_MUTEX();
916
    return 0;
917 918
}

919
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
920
{
somdoron's avatar
somdoron committed
921 922
    ENTER_MUTEX();

923
    //  Check whether the library haven't been shut down yet.
924
    if (unlikely (ctx_terminated)) {
925
        errno = ETERM;
somdoron's avatar
somdoron committed
926
        EXIT_MUTEX();
927 928 929
        return -1;
    }

930
    //  Check whether message passed to the function is valid.
931
    if (unlikely (!msg_ || !msg_->check ())) {
932
        errno = EFAULT;
somdoron's avatar
somdoron committed
933
        EXIT_MUTEX();
934 935 936
        return -1;
    }

937
    //  Process pending commands, if any.
938
    int rc = process_commands (0, true);
somdoron's avatar
somdoron committed
939 940
    if (unlikely (rc != 0)) {
        EXIT_MUTEX();
941
        return -1;
somdoron's avatar
somdoron committed
942
    }
943

944 945 946
    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

947
    //  At this point we impose the flags on the message.
948
    if (flags_ & ZMQ_SNDMORE)
949
        msg_->set_flags (msg_t::more);
950

951 952
    msg_->reset_metadata ();

Martin Sustrik's avatar
Martin Sustrik committed
953
    //  Try to send the message.
954
    rc = xsend (msg_);
somdoron's avatar
somdoron committed
955 956
    if (rc == 0) {
        EXIT_MUTEX();
957
        return 0;
somdoron's avatar
somdoron committed
958 959 960
    }
    if (unlikely (errno != EAGAIN)) {
        EXIT_MUTEX();
961
        return -1;
somdoron's avatar
somdoron committed
962
    }
Martin Sustrik's avatar
Martin Sustrik committed
963

964
    //  In case of non-blocking send we'll simply propagate
965
    //  the error - including EAGAIN - up the stack.
somdoron's avatar
somdoron committed
966 967
    if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
        EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
968
        return -1;
somdoron's avatar
somdoron committed
969
    }
Martin Sustrik's avatar
Martin Sustrik committed
970

971
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
972
    //  If the timeout is infinite, don't care.
973 974 975
    int timeout = options.sndtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

976 977
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
978 979
    //  If timeout is reached in the meantime, return EAGAIN.
    while (true) {
somdoron's avatar
somdoron committed
980 981
        if (unlikely (process_commands (timeout, false) != 0)) {
            EXIT_MUTEX();
982
            return -1;
somdoron's avatar
somdoron committed
983
        }        
984
        rc = xsend (msg_);
985 986
        if (rc == 0)
            break;
somdoron's avatar
somdoron committed
987 988
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
989
            return -1;
somdoron's avatar
somdoron committed
990
        }
991
        if (timeout > 0) {
992
            timeout = (int) (end - clock.now_ms ());
993 994
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
995
                EXIT_MUTEX();
996 997 998
                return -1;
            }
        }
999
    }
somdoron's avatar
somdoron committed
1000 1001

    EXIT_MUTEX();
Martin Sustrik's avatar
Martin Sustrik committed
1002
    return 0;
1003 1004
}

1005
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1006
{
somdoron's avatar
somdoron committed
1007 1008
    ENTER_MUTEX();

1009
    //  Check whether the library haven't been shut down yet.
1010
    if (unlikely (ctx_terminated)) {
1011
        errno = ETERM;
somdoron's avatar
somdoron committed
1012
        EXIT_MUTEX();
1013 1014 1015
        return -1;
    }

1016
    //  Check whether message passed to the function is valid.
1017
    if (unlikely (!msg_ || !msg_->check ())) {
1018
        errno = EFAULT;
somdoron's avatar
somdoron committed
1019
        EXIT_MUTEX();
1020 1021 1022
        return -1;
    }

1023 1024 1025 1026 1027 1028 1029
    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
1030
    //  ticks is more efficient than doing RDTSC all the time.
1031
    if (++ticks == inbound_poll_rate) {
somdoron's avatar
somdoron committed
1032 1033
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1034
            return -1;
somdoron's avatar
somdoron committed
1035
        }
1036 1037 1038
        ticks = 0;
    }

Martin Hurton's avatar
Martin Hurton committed
1039
    //  Get the message.
1040
    int rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1041 1042
    if (unlikely (rc != 0 && errno != EAGAIN)) {
        EXIT_MUTEX();
Martin Hurton's avatar
Martin Hurton committed
1043
        return -1;
somdoron's avatar
somdoron committed
1044
    }
Martin Hurton's avatar
Martin Hurton committed
1045

1046
    //  If we have the message, return immediately.
1047
    if (rc == 0) {
1048
        if (file_desc != retired_fd)
1049
            msg_->set_fd(file_desc);
1050
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1051
        EXIT_MUTEX();
1052
        return 0;
1053
    }
1054

Martin Sustrik's avatar
Martin Sustrik committed
1055
    //  If the message cannot be fetched immediately, there are two scenarios.
1056 1057 1058
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
1059
    if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
somdoron's avatar
somdoron committed
1060 1061
        if (unlikely (process_commands (0, false) != 0)) {
            EXIT_MUTEX();
1062
            return -1;
somdoron's avatar
somdoron committed
1063
        }
1064
        ticks = 0;
1065

1066
        rc = xrecv (msg_);
somdoron's avatar
somdoron committed
1067 1068
        if (rc < 0) {
            EXIT_MUTEX();
1069
            return rc;
somdoron's avatar
somdoron committed
1070
        }
1071
        if (file_desc != retired_fd)
1072
            msg_->set_fd(file_desc);
1073
        extract_flags (msg_);
somdoron's avatar
somdoron committed
1074 1075

        EXIT_MUTEX();
1076
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1077 1078
    }

1079
    //  Compute the time when the timeout should occur.
Matt Arsenault's avatar
Matt Arsenault committed
1080
    //  If the timeout is infinite, don't care.
1081 1082 1083
    int timeout = options.rcvtimeo;
    uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

1084 1085
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
1086
    bool block = (ticks != 0);
1087
    while (true) {
somdoron's avatar
somdoron committed
1088 1089
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
            EXIT_MUTEX();
1090
            return -1;
somdoron's avatar
somdoron committed
1091
        }
1092
        rc = xrecv (msg_);
1093 1094 1095 1096
        if (rc == 0) {
            ticks = 0;
            break;
        }
somdoron's avatar
somdoron committed
1097 1098
        if (unlikely (errno != EAGAIN)) {
            EXIT_MUTEX();
1099
            return -1;
somdoron's avatar
somdoron committed
1100
        }
1101
        block = true;
1102
        if (timeout > 0) {
1103
            timeout = (int) (end - clock.now_ms ());
1104 1105
            if (timeout <= 0) {
                errno = EAGAIN;
somdoron's avatar
somdoron committed
1106
                EXIT_MUTEX();
1107 1108 1109
                return -1;
            }
        }
1110
    }
1111

1112
    if (file_desc != retired_fd)
1113
        msg_->set_fd(file_desc);
1114
    extract_flags (msg_);
somdoron's avatar
somdoron committed
1115
    EXIT_MUTEX();
1116
    return 0;
1117 1118 1119 1120
}

int zmq::socket_base_t::close ()
{
1121 1122 1123
    //  Mark the socket as dead
    tag = 0xdeadbeef;
    
1124 1125 1126 1127
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
1128

1129 1130 1131
    return 0;
}

1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

1142
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1143
{
1144
    //  Plug the socket to the reaper thread.
1145
    poller = poller_;
somdoron's avatar
somdoron committed
1146 1147 1148 1149 1150 1151 1152 1153

    fd_t fd;

    if (!thread_safe)
        fd = ((mailbox_t*)mailbox)->get_fd();
    else {        
        ENTER_MUTEX();

1154 1155
        reaper_signaler =  new signaler_t();

somdoron's avatar
somdoron committed
1156
        //  Add signaler to the safe mailbox
1157 1158
        fd = reaper_signaler->get_fd();        
        ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
somdoron's avatar
somdoron committed
1159 1160

        //  Send a signal to make sure reaper handle existing commands
1161
        reaper_signaler->send();
somdoron's avatar
somdoron committed
1162 1163 1164 1165 1166

        EXIT_MUTEX();
    }

    handle = poller->add_fd (fd, this);
1167
    poller->set_pollin (handle);
1168 1169 1170 1171 1172

    //  Initialise the termination and check whether it can be deallocated
    //  immediately.
    terminate ();
    check_destroy ();
Martin Hurton's avatar
Martin Hurton committed
1173 1174
}

1175
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
1176
{
1177
    int rc;
1178
    command_t cmd;
1179 1180 1181
    if (timeout_ != 0) {

        //  If we are asked to wait, simply ask mailbox to wait.
somdoron's avatar
somdoron committed
1182
        rc = mailbox->recv (&cmd, timeout_);
1183 1184
    }
    else {
1185

1186 1187 1188
        //  If we are asked not to wait, check whether we haven't processed
        //  commands recently, so that we can throttle the new commands.

Martin Sustrik's avatar
Martin Sustrik committed
1189
        //  Get the CPU's tick counter. If 0, the counter is not available.
Martin Hurton's avatar
Martin Hurton committed
1190
        const uint64_t tsc = zmq::clock_t::rdtsc ();
Martin Sustrik's avatar
Martin Sustrik committed
1191

1192 1193 1194 1195 1196 1197
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
1198 1199
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
1200 1201 1202
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
1203
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1204
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1205
            last_tsc = tsc;
1206 1207 1208
        }

        //  Check whether there are any commands pending for this thread.
somdoron's avatar
somdoron committed
1209
        rc = mailbox->recv (&cmd, 0);
1210
    }
Martin Sustrik's avatar
Martin Sustrik committed
1211

1212 1213
    //  Process all available commands.
    while (rc == 0) {
1214
        cmd.destination->process_command (cmd);
somdoron's avatar
somdoron committed
1215
        rc = mailbox->recv (&cmd, 0);
1216 1217 1218 1219 1220 1221
    }

    if (errno == EINTR)
        return -1;

    zmq_assert (errno == EAGAIN);
1222

1223
    if (ctx_terminated) {
1224 1225
        errno = ETERM;
        return -1;
1226
    }
1227 1228

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
1229 1230
}

1231
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
1232
{
1233
    //  Here, someone have called zmq_term while the socket was still alive.
1234
    //  We'll remember the fact so that any blocking call is interrupted and any
1235 1236
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
1237
    stop_monitor ();
1238
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
1239 1240
}

1241
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
1242
{
1243
    attach_pipe (pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
1244 1245
}

1246
void zmq::socket_base_t::process_term (int linger_)
1247
{
1248 1249 1250 1251 1252
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

1253 1254
    //  Ask all attached pipes to terminate.
    for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1255
        pipes [i]->terminate (false);
Martin Sustrik's avatar
Martin Sustrik committed
1256
    register_term_acks ((int) pipes.size ());
1257

1258
    //  Continue the termination process immediately.
1259
    own_t::process_term (linger_);
1260 1261
}

1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
void zmq::socket_base_t::update_pipe_options(int option_)
{
	if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) 
	{
		for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
		{
			pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
		}
	}

}

1274 1275 1276 1277 1278
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

1279
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1280 1281 1282 1283
{
    errno = EINVAL;
    return -1;
}
1284 1285 1286 1287 1288 1289

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

1290
int zmq::socket_base_t::xsend (msg_t *)
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

1301
int zmq::socket_base_t::xrecv (msg_t *)
1302 1303 1304 1305 1306
{
    errno = ENOTSUP;
    return -1;
}

1307 1308 1309 1310 1311
zmq::blob_t zmq::socket_base_t::get_credential () const
{
    return blob_t ();
}

1312
void zmq::socket_base_t::xread_activated (pipe_t *)
1313 1314 1315
{
    zmq_assert (false);
}
1316
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1317 1318 1319 1320
{
    zmq_assert (false);
}

1321
void zmq::socket_base_t::xhiccuped (pipe_t *)
1322
{
1323
    zmq_assert (false);
1324 1325
}

1326 1327
void zmq::socket_base_t::in_event ()
{
1328 1329 1330 1331
    //  This function is invoked only once the socket is running in the context
    //  of the reaper thread. Process any commands from other threads/sockets
    //  that may be available at the moment. Ultimately, the socket will
    //  be destroyed.
somdoron's avatar
somdoron committed
1332 1333

    ENTER_MUTEX();
1334 1335 1336 1337 1338
    
    //  If the socket is thread safe we need to unsignal the reaper signaler
    if (thread_safe)
        reaper_signaler->recv();
    
1339
    process_commands (0, false);
somdoron's avatar
somdoron committed
1340
    EXIT_MUTEX();
1341 1342 1343 1344 1345 1346 1347 1348
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

1349
void zmq::socket_base_t::timer_event (int)
1350 1351 1352
{
    zmq_assert (false);
}
1353

1354 1355
void zmq::socket_base_t::check_destroy ()
{
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382

void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
    xread_activated (pipe_);
}

void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
    xwrite_activated (pipe_);
}

1383 1384
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
1385
    if (options.immediate == 1)
1386 1387 1388 1389
        pipe_->terminate (false);
    else
        // Notify derived sockets of the hiccup
        xhiccuped (pipe_);
1390 1391
}

1392
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1393 1394
{
    //  Notify the specific socket type about the pipe termination.
1395
    xpipe_terminated (pipe_);
1396

1397
    // Remove pipe from inproc pipes
Martin Hurton's avatar
Martin Hurton committed
1398
    for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1399
        if (it->second == pipe_) {
Martin Hurton's avatar
Martin Hurton committed
1400
            inprocs.erase (it);
1401
            break;
1402 1403
        }

1404 1405 1406 1407 1408 1409 1410
    //  Remove the pipe from the list of attached pipes and confirm its
    //  termination if we are already shutting down.
    pipes.erase (pipe_);
    if (is_terminating ())
        unregister_term_ack ();
}

1411 1412
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
1413
    //  Test whether IDENTITY flag is valid for this socket type.
1414
    if (unlikely (msg_->flags () & msg_t::identity))
1415
        zmq_assert (options.recv_identity);
Martin Hurton's avatar
Martin Hurton committed
1416

1417
    //  Remove MORE flag.
1418 1419
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}
1420

1421
int zmq::socket_base_t::monitor (const char *addr_, int events_)
1422
{
1423 1424 1425 1426
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }
1427
    //  Support deregistering monitoring endpoints as well
1428 1429 1430 1431 1432 1433 1434
    if (addr_ == NULL) {
        stop_monitor ();
        return 0;
    }
    //  Parse addr_ string.
    std::string protocol;
    std::string address;
Pieter Hintjens's avatar
Pieter Hintjens committed
1435
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1436 1437
        return -1;

1438
    //  Event notification only supported over inproc://
1439 1440 1441 1442
    if (protocol != "inproc") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
1443
    //  Register events to monitor
1444
    monitor_events = events_;
Martin Hurton's avatar
Martin Hurton committed
1445
    monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1446 1447 1448
    if (monitor_socket == NULL)
        return -1;

1449
    //  Never block context termination on pending event messages
1450
    int linger = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
1451
    int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1452
    if (rc == -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
1453
        stop_monitor ();
1454

1455
    //  Spawn the monitor socket endpoint
1456 1457 1458 1459 1460 1461
    rc = zmq_bind (monitor_socket, addr_);
    if (rc == -1)
         stop_monitor ();
    return rc;
}

1462 1463 1464 1465 1466 1467 1468 1469 1470 1471
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
    file_desc = fd_;
}

zmq::fd_t zmq::socket_base_t::fd()
{
    return file_desc;
}

Martin Hurton's avatar
Martin Hurton committed
1472
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1473
{
1474 1475
    if (monitor_events & ZMQ_EVENT_CONNECTED)
        monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1476
}
1477

Martin Hurton's avatar
Martin Hurton committed
1478
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1479
{
1480 1481
    if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
        monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1482
}
1483

Martin Hurton's avatar
Martin Hurton committed
1484
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1485
{
1486 1487
    if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
        monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1488 1489
}

Martin Hurton's avatar
Martin Hurton committed
1490
void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1491
{
1492 1493
    if (monitor_events & ZMQ_EVENT_LISTENING)
        monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1494 1495
}

Martin Hurton's avatar
Martin Hurton committed
1496
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1497
{
1498 1499
    if (monitor_events & ZMQ_EVENT_BIND_FAILED)
        monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1500 1501
}

Martin Hurton's avatar
Martin Hurton committed
1502
void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1503
{
1504 1505
    if (monitor_events & ZMQ_EVENT_ACCEPTED)
        monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1506 1507
}

Martin Hurton's avatar
Martin Hurton committed
1508
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1509
{
1510 1511
    if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
        monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1512 1513
}

Martin Hurton's avatar
Martin Hurton committed
1514
void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1515
{
1516 1517
    if (monitor_events & ZMQ_EVENT_CLOSED)
        monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1518
}
Martin Hurton's avatar
Martin Hurton committed
1519 1520

void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1521
{
1522 1523
    if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
        monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1524 1525
}

Martin Hurton's avatar
Martin Hurton committed
1526
void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1527
{
1528 1529
    if (monitor_events & ZMQ_EVENT_DISCONNECTED)
        monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1530 1531
}

1532 1533
//  Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1534
{
1535
    if (monitor_socket) {
1536
        //  Send event in first frame
1537
        zmq_msg_t msg;
1538
        zmq_msg_init_size (&msg, 6);
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
#ifdef ZMQ_HAVE_HPUX
        // avoid SIGBUS
        union {
          uint8_t data[6];
          struct {
            uint16_t event;
            uint32_t value;
          } v;
        } u;
        u.v.event = event_;
        u.v.value = value_;
        memcpy(zmq_msg_data (&msg), u.data, 6);
#else
1552 1553 1554
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        *(uint16_t *) (data + 0) = (uint16_t) event_;
        *(uint32_t *) (data + 2) = (uint32_t) value_;
1555
#endif
1556
        zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1557 1558

        //  Send address in second frame
1559
        zmq_msg_init_size (&msg, addr_.size());
1560
        memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1561 1562
        zmq_sendmsg (monitor_socket, &msg, 0);
    }
1563 1564
}

1565
void zmq::socket_base_t::stop_monitor (void)
1566 1567
{
    if (monitor_socket) {
1568 1569
        if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1570 1571 1572 1573
        zmq_close (monitor_socket);
        monitor_socket = NULL;
        monitor_events = 0;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
1574
}