socket_base.cpp 21.5 KB
Newer Older
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
16

17
    You should have received a copy of the GNU Lesser General Public License
18 19 20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

21
#include <new>
22
#include <string>
23 24
#include <algorithm>

25
#include "../include/zmq.h"
26

27 28 29 30 31 32 33 34 35 36
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
#include <intrin.h>
#endif
#else
#include <unistd.h>
#endif
37

38
#include "socket_base.hpp"
39
#include "zmq_listener.hpp"
40
#include "zmq_connecter.hpp"
41
#include "io_thread.hpp"
42
#include "connect_session.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
43
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
44
#include "clock.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45
#include "pipe.hpp"
46
#include "err.hpp"
47
#include "ctx.hpp"
malosek's avatar
malosek committed
48
#include "platform.hpp"
49
#include "likely.hpp"
50 51
#include "uuid.hpp"

52 53 54 55 56 57 58 59 60
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
61 62
#include "xpub.hpp"
#include "xsub.hpp"
63

64
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
Martin Sustrik's avatar
Martin Sustrik committed
65
    uint32_t tid_)
66 67 68 69 70
{
    socket_base_t *s = NULL;
    switch (type_) {

    case ZMQ_PAIR:
Martin Sustrik's avatar
Martin Sustrik committed
71
        s = new (std::nothrow) pair_t (parent_, tid_);
72 73
        break;
    case ZMQ_PUB:
Martin Sustrik's avatar
Martin Sustrik committed
74
        s = new (std::nothrow) pub_t (parent_, tid_);
75 76
        break;
    case ZMQ_SUB:
Martin Sustrik's avatar
Martin Sustrik committed
77
        s = new (std::nothrow) sub_t (parent_, tid_);
78 79
        break;
    case ZMQ_REQ:
Martin Sustrik's avatar
Martin Sustrik committed
80
        s = new (std::nothrow) req_t (parent_, tid_);
81 82
        break;
    case ZMQ_REP:
Martin Sustrik's avatar
Martin Sustrik committed
83
        s = new (std::nothrow) rep_t (parent_, tid_);
84 85
        break;
    case ZMQ_XREQ:
Martin Sustrik's avatar
Martin Sustrik committed
86
        s = new (std::nothrow) xreq_t (parent_, tid_);
87 88
        break;
    case ZMQ_XREP:
Martin Sustrik's avatar
Martin Sustrik committed
89
        s = new (std::nothrow) xrep_t (parent_, tid_);
90 91
        break;     
    case ZMQ_PULL:
Martin Sustrik's avatar
Martin Sustrik committed
92
        s = new (std::nothrow) pull_t (parent_, tid_);
93 94
        break;
    case ZMQ_PUSH:
Martin Sustrik's avatar
Martin Sustrik committed
95
        s = new (std::nothrow) push_t (parent_, tid_);
96
        break;
97 98 99 100 101 102
    case ZMQ_XPUB:
        s = new (std::nothrow) xpub_t (parent_, tid_);
        break;
    case ZMQ_XSUB:
        s = new (std::nothrow) xsub_t (parent_, tid_);
        break;    
103 104 105 106
    default:
        errno = EINVAL;
        return NULL;
    }
107
    alloc_assert (s);
108 109 110
    return s;
}

Martin Sustrik's avatar
Martin Sustrik committed
111 112
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
    own_t (parent_, tid_),
113
    ctx_terminated (false),
114
    destroyed (false),
Martin Sustrik's avatar
Martin Sustrik committed
115
    last_tsc (0),
Martin Sustrik's avatar
Martin Sustrik committed
116
    ticks (0),
117
    rcvmore (false)
Martin Sustrik's avatar
Martin Sustrik committed
118
{
119 120 121 122
}

zmq::socket_base_t::~socket_base_t ()
{
123
    zmq_assert (destroyed);
124

125 126 127 128
    //  Check whether there are no session leaks.
    sessions_sync.lock ();
    zmq_assert (sessions.empty ());
    sessions_sync.unlock ();
129 130
}

131
zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
132
{
133
    return &mailbox;
134 135 136 137 138 139 140 141 142 143 144
}

void zmq::socket_base_t::stop ()
{
    //  Called by ctx when it is terminated (zmq_term).
    //  'stop' command is sent from the threads that called zmq_term to
    //  the thread owning the socket. This way, blocking call in the
    //  owner thread can be interrupted.
    send_stop ();
}

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
int zmq::socket_base_t::parse_uri (const char *uri_,
                        std::string &protocol_, std::string &address_)
{
    zmq_assert (uri_ != NULL);

    std::string uri (uri_);
    std::string::size_type pos = uri.find ("://");
    if (pos == std::string::npos) {
        errno = EINVAL;
        return -1;
    }
    protocol_ = uri.substr (0, pos);
    address_ = uri.substr (pos + 3);
    if (protocol_.empty () || address_.empty ()) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

165 166 167 168
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
    //  First check out whether the protcol is something we are aware of.
    if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
169
          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") {
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
        errno = EPROTONOSUPPORT;
        return -1;
    }

    //  If 0MQ is not compiled with OpenPGM, pgm and epgm transports
    //  are not avaialble.
#if !defined ZMQ_HAVE_OPENPGM
    if (protocol_ == "pgm" || protocol_ == "epgm") {
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

    //  IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
185
    if (protocol_ == "ipc") {
186 187 188 189 190 191 192 193 194 195
        //  Unknown protocol.
        errno = EPROTONOSUPPORT;
        return -1;
    }
#endif

    //  Check whether socket type and transport protocol match.
    //  Specifically, multicast protocols can't be combined with
    //  bi-directional messaging patterns (socket types).
    if ((protocol_ == "pgm" || protocol_ == "epgm") &&
196 197
          options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
          options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
198 199 200 201 202 203 204 205
        errno = ENOCOMPATPROTO;
        return -1;
    }

    //  Protocol is available.
    return 0;
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
    class writer_t *outpipe_, const blob_t &peer_identity_)
{
    // If the peer haven't specified it's identity, let's generate one.
    if (peer_identity_.size ()) {
        xattach_pipes (inpipe_, outpipe_, peer_identity_);
    }
    else {
        blob_t identity (1, 0);
        identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
        xattach_pipes (inpipe_, outpipe_, identity);
    }
}

220
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
221
    size_t optvallen_)
222
{
223
    if (unlikely (ctx_terminated)) {
224 225 226 227
        errno = ETERM;
        return -1;
    }

228 229 230 231 232 233 234 235
    //  First, check whether specific socket type overloads the option.
    int rc = xsetsockopt (option_, optval_, optvallen_);
    if (rc == 0 || errno != EINVAL)
        return rc;

    //  If the socket type doesn't support the option, pass it to
    //  the generic option parser.
    return options.setsockopt (option_, optval_, optvallen_);
236 237
}

238 239 240
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
    size_t *optvallen_)
{
241
    if (unlikely (ctx_terminated)) {
242 243 244 245
        errno = ETERM;
        return -1;
    }

246
    if (option_ == ZMQ_RCVMORE) {
247
        if (*optvallen_ < sizeof (int)) {
248 249 250
            errno = EINVAL;
            return -1;
        }
251 252
        *((int*) optval_) = rcvmore ? 1 : 0;
        *optvallen_ = sizeof (int);
253 254 255
        return 0;
    }

256 257 258 259 260
    if (option_ == ZMQ_FD) {
        if (*optvallen_ < sizeof (fd_t)) {
            errno = EINVAL;
            return -1;
        }
261
        *((fd_t*) optval_) = mailbox.get_fd ();
262 263 264 265 266
        *optvallen_ = sizeof (fd_t);
        return 0;
    }

    if (option_ == ZMQ_EVENTS) {
267
        if (*optvallen_ < sizeof (int)) {
268 269 270
            errno = EINVAL;
            return -1;
        }
271
        int rc = process_commands (false, false);
272
        if (rc != 0 && (errno == EINTR || errno == ETERM))
273 274
            return -1;
        errno_assert (rc == 0);
275
        *((int*) optval_) = 0;
276
        if (has_out ())
277
            *((int*) optval_) |= ZMQ_POLLOUT;
278
        if (has_in ())
279 280
            *((int*) optval_) |= ZMQ_POLLIN;
        *optvallen_ = sizeof (int);
281 282 283
        return 0;
    }

284 285 286
    return options.getsockopt (option_, optval_, optvallen_);
}

287 288
int zmq::socket_base_t::bind (const char *addr_)
{
289
    if (unlikely (ctx_terminated)) {
290 291 292 293
        errno = ETERM;
        return -1;
    }

294
    //  Parse addr_ string.
295 296
    std::string protocol;
    std::string address;
297 298 299
    int rc = parse_uri (addr_, protocol, address);
    if (rc != 0)
        return -1;
300

301
    rc = check_protocol (protocol);
302 303
    if (rc != 0)
        return -1;
304

305 306 307 308
    if (protocol == "inproc" || protocol == "sys") {
        endpoint_t endpoint = {this, options};
        return register_endpoint (addr_, endpoint);
    }
309

310
    if (protocol == "tcp" || protocol == "ipc") {
311 312 313 314 315 316 317 318 319

        //  Choose I/O thread to run the listerner in.
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }

        //  Create and run the listener.
320
        zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
321
            io_thread, this, options);
322
        alloc_assert (listener);
323
        int rc = listener->set_address (protocol.c_str(), address.c_str ());
324 325
        if (rc != 0) {
            delete listener;
326
            return -1;
327
        }
328
        launch_child (listener);
329 330 331 332

        return 0;
    }

333 334 335 336
    if (protocol == "pgm" || protocol == "epgm") {

        //  For convenience's sake, bind can be used interchageable with
        //  connect for PGM and EPGM transports.
337 338 339
        return connect (addr_); 
    }

340
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
341
    return -1;
342 343
}

344
int zmq::socket_base_t::connect (const char *addr_)
345
{
346
    if (unlikely (ctx_terminated)) {
347 348 349 350
        errno = ETERM;
        return -1;
    }

malosek's avatar
malosek committed
351
    //  Parse addr_ string.
352 353
    std::string protocol;
    std::string address;
354 355 356
    int rc = parse_uri (addr_, protocol, address);
    if (rc != 0)
        return -1;
malosek's avatar
malosek committed
357

358
    rc = check_protocol (protocol);
359 360
    if (rc != 0)
        return -1;
malosek's avatar
malosek committed
361

362
    if (protocol == "inproc" || protocol == "sys") {
363

364 365 366 367
        //  TODO: inproc connect is specific with respect to creating pipes
        //  as there's no 'reconnect' functionality implemented. Once that
        //  is in place we should follow generic pipe creation algorithm.

368 369 370
        //  Find the peer endpoint.
        endpoint_t peer = find_endpoint (addr_);
        if (!peer.socket)
371 372
            return -1;

373 374 375 376
        reader_t *inpipe_reader = NULL;
        writer_t *inpipe_writer = NULL;
        reader_t *outpipe_reader = NULL;
        writer_t *outpipe_writer = NULL;
377 378

        // The total HWM for an inproc connection should be the sum of
379
        // the binder's HWM and the connector's HWM.
380 381 382 383
        int  sndhwm;
        int  rcvhwm;
        if (options.sndhwm == 0 || peer.options.rcvhwm == 0)
            sndhwm = 0;
384
        else
385 386 387 388 389
            sndhwm = options.sndhwm + peer.options.rcvhwm;
        if (options.rcvhwm == 0 || peer.options.sndhwm == 0)
            rcvhwm = 0;
        else
            rcvhwm = options.rcvhwm + peer.options.sndhwm;
390

391
        //  Create inbound pipe, if required.
392
        if (options.requires_in)
393
            create_pipe (this, peer.socket, rcvhwm, &inpipe_reader,
394
                &inpipe_writer);
395 396

        //  Create outbound pipe, if required.
397
        if (options.requires_out)
398
            create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
399
                &outpipe_writer);
400 401

        //  Attach the pipes to this socket object.
402
        attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity);
403 404

        //  Attach the pipes to the peer socket. Note that peer's seqnum
405 406
        //  was incremented in find_endpoint function. We don't need it
        //  increased here.
407
        send_bind (peer.socket, outpipe_reader, inpipe_writer,
408
            options.identity, false);
409 410 411 412

        return 0;
    }

413 414 415 416 417 418 419
    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    if (!io_thread) {
        errno = EMTHREAD;
        return -1;
    }

420 421
    //  Create session.
    connect_session_t *session = new (std::nothrow) connect_session_t (
422
        io_thread, this, options, protocol.c_str (), address.c_str ());
423
    alloc_assert (session);
Martin Sustrik's avatar
Martin Sustrik committed
424

425
    //  If 'immediate connect' feature is required, we'll create the pipes
426 427 428
    //  to the session straight away. Otherwise, they'll be created by the
    //  session once the connection is established.
    if (options.immediate_connect) {
429

430 431 432 433
        reader_t *inpipe_reader = NULL;
        writer_t *inpipe_writer = NULL;
        reader_t *outpipe_reader = NULL;
        writer_t *outpipe_writer = NULL;
434

435
        //  Create inbound pipe, if required.
436
        if (options.requires_in)
437
            create_pipe (this, session, options.rcvhwm,
438
                &inpipe_reader, &inpipe_writer);
439

440
        //  Create outbound pipe, if required.
441
        if (options.requires_out)
442
            create_pipe (session, this, options.sndhwm,
443
                &outpipe_reader, &outpipe_writer);
444

445
        //  Attach the pipes to the socket object.
446
        attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
447 448

        //  Attach the pipes to the session object.
449
        session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
450
    }
Martin Sustrik's avatar
Martin Sustrik committed
451

452 453
    //  Activate the session. Make it a child of this socket.
    launch_child (session);
malosek's avatar
malosek committed
454

455
    return 0;
456 457
}

458
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
459
{
460
    if (unlikely (ctx_terminated)) {
461 462 463 464
        errno = ETERM;
        return -1;
    }

465
    //  Process pending commands, if any.
466 467
    int rc = process_commands (false, true);
    if (unlikely (rc != 0))
468 469
        return -1;

470 471
    //  At this point we impose the MORE flag on the message.
    if (flags_ & ZMQ_SNDMORE)
472
        msg_->flags |= ZMQ_MSG_MORE;
473

Martin Sustrik's avatar
Martin Sustrik committed
474
    //  Try to send the message.
475
    rc = xsend (msg_, flags_);
476 477
    if (rc == 0)
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
478

479 480
    //  In case of non-blocking send we'll simply propagate
    //  the error - including EAGAIN - upwards.
481
    if (flags_ & ZMQ_DONTWAIT)
Martin Sustrik's avatar
Martin Sustrik committed
482 483
        return -1;

484 485 486 487 488
    //  Oops, we couldn't send the message. Wait for the next
    //  command, process it and try to send the message again.
    while (rc != 0) {
        if (errno != EAGAIN)
            return -1;
489
        if (unlikely (process_commands (true, false) != 0))
490
            return -1;
491 492
        rc = xsend (msg_, flags_);
    }
Martin Sustrik's avatar
Martin Sustrik committed
493
    return 0;
494 495
}

496
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
497
{
498
    if (unlikely (ctx_terminated)) {
499 500 501 502
        errno = ETERM;
        return -1;
    }

503
    //  Get the message.
504
    int rc = xrecv (msg_, flags_);
505
    int err = errno;
506 507 508 509 510 511 512 513

    //  Once every inbound_poll_rate messages check for signals and process
    //  incoming commands. This happens only if we are not polling altogether
    //  because there are messages available all the time. If poll occurs,
    //  ticks is set to zero and thus we avoid this code.
    //
    //  Note that 'recv' uses different command throttling algorithm (the one
    //  described above) from the one used by 'send'. This is because counting
Martin Sustrik's avatar
Martin Sustrik committed
514
    //  ticks is more efficient than doing RDTSC all the time.
515
    if (++ticks == inbound_poll_rate) {
516
        if (unlikely (process_commands (false, false) != 0))
517
            return -1;
518 519 520 521
        ticks = 0;
    }

    //  If we have the message, return immediately.
522 523 524 525
    if (rc == 0) {
        rcvmore = msg_->flags & ZMQ_MSG_MORE;
        if (rcvmore)
            msg_->flags &= ~ZMQ_MSG_MORE;
526
        return 0;
527
    }
528

529 530 531
    //  If we don't have the message, restore the original cause of the problem.
    errno = err;

Martin Sustrik's avatar
Martin Sustrik committed
532
    //  If the message cannot be fetched immediately, there are two scenarios.
533 534 535
    //  For non-blocking recv, commands are processed in case there's an
    //  activate_reader command already waiting int a command pipe.
    //  If it's not, return EAGAIN.
536
    if (flags_ & ZMQ_DONTWAIT) {
537 538
        if (errno != EAGAIN)
            return -1;
539
        if (unlikely (process_commands (false, false) != 0))
540
            return -1;
541
        ticks = 0;
542 543 544 545 546 547 548 549

        rc = xrecv (msg_, flags_);
        if (rc == 0) {
            rcvmore = msg_->flags & ZMQ_MSG_MORE;
            if (rcvmore)
                msg_->flags &= ~ZMQ_MSG_MORE;
        }
        return rc;
Martin Sustrik's avatar
Martin Sustrik committed
550 551
    }

552 553
    //  In blocking scenario, commands are processed over and over again until
    //  we are able to fetch a message.
554
    bool block = (ticks != 0);
555 556 557
    while (rc != 0) {
        if (errno != EAGAIN)
            return -1;
Martin Sustrik's avatar
Martin Sustrik committed
558
        if (unlikely (process_commands (block, false) != 0))
559
            return -1;
560 561
        rc = xrecv (msg_, flags_);
        ticks = 0;
562
        block = true;
563
    }
564 565 566 567

    rcvmore = msg_->flags & ZMQ_MSG_MORE;
    if (rcvmore)
        msg_->flags &= ~ZMQ_MSG_MORE;
568
    return 0;
569 570 571 572
}

int zmq::socket_base_t::close ()
{
573 574 575 576
    //  Transfer the ownership of the socket from this application thread
    //  to the reaper thread which will take care of the rest of shutdown
    //  process.
    send_reap (this);
577

578 579 580
    return 0;
}

581 582 583 584 585 586 587 588 589 590
bool zmq::socket_base_t::has_in ()
{
    return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
    return xhas_out ();
}

591
bool zmq::socket_base_t::register_session (const blob_t &name_,
592
    session_t *session_)
593 594
{
    sessions_sync.lock ();
595
    bool registered = sessions.insert (
596
        sessions_t::value_type (name_, session_)).second;
597
    sessions_sync.unlock ();
598
    return registered;
599 600
}

601
void zmq::socket_base_t::unregister_session (const blob_t &name_)
602 603
{
    sessions_sync.lock ();
604
    sessions_t::iterator it = sessions.find (name_);
605 606
    zmq_assert (it != sessions.end ());
    sessions.erase (it);
607 608 609
    sessions_sync.unlock ();
}

610
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
611 612
{
    sessions_sync.lock ();
613
    sessions_t::iterator it = sessions.find (name_);
614
    if (it == sessions.end ()) {
615 616 617 618 619 620
        sessions_sync.unlock ();
        return NULL;
    }
    session_t *session = it->second;

    //  Prepare the session for subsequent attach command.
621 622 623
    //  Note the connect sessions have NULL pointers registered here.
    if (session)
        session->inc_seqnum ();
624 625 626 627 628

    sessions_sync.unlock ();
    return session;    
}

629
void zmq::socket_base_t::start_reaping (poller_t *poller_)
630
{
631 632 633
    poller = poller_;
    handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
634 635
}

636
int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
Martin Sustrik's avatar
Martin Sustrik committed
637
{
638
    int rc;
639 640
    command_t cmd;
    if (block_) {
641
        rc = mailbox.recv (&cmd, true);
642 643 644
        if (rc == -1 && errno == EINTR)
            return -1;
        errno_assert (rc == 0);
645 646
    }
    else {
647

Martin Sustrik's avatar
Martin Sustrik committed
648 649 650
        //  Get the CPU's tick counter. If 0, the counter is not available.
        uint64_t tsc = zmq::clock_t::rdtsc ();

651 652 653 654 655 656
        //  Optimised version of command processing - it doesn't have to check
        //  for incoming commands each time. It does so only if certain time
        //  elapsed since last command processing. Command delay varies
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
        //  etc. The optimisation makes sense only on platforms where getting
        //  a timestamp is a very cheap operation (tens of nanoseconds).
Martin Sustrik's avatar
Martin Sustrik committed
657 658
        if (tsc && throttle_) {

Martin Sustrik's avatar
Martin Sustrik committed
659 660 661
            //  Check whether TSC haven't jumped backwards (in case of migration
            //  between CPU cores) and whether certain time have elapsed since
            //  last command processing. If it didn't do nothing.
Martin Sustrik's avatar
Martin Sustrik committed
662
            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
663
                return 0;
Martin Sustrik's avatar
Martin Sustrik committed
664
            last_tsc = tsc;
665 666 667
        }

        //  Check whether there are any commands pending for this thread.
668
        rc = mailbox.recv (&cmd, false);
669
    }
Martin Sustrik's avatar
Martin Sustrik committed
670

671
    //  Process all the commands available at the moment.
672 673 674 675 676 677
    while (true) {
        if (rc == -1 && errno == EAGAIN)
            break;
        if (rc == -1 && errno == EINTR)
            return -1;
        errno_assert (rc == 0);
678
        cmd.destination->process_command (cmd);
679
        rc = mailbox.recv (&cmd, false);
680 681
     }

682
    if (ctx_terminated) {
683 684
        errno = ETERM;
        return -1;
685
    }
686 687

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
688 689
}

690
void zmq::socket_base_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
691
{
692
    //  Here, someone have called zmq_term while the socket was still alive.
693
    //  We'll remember the fact so that any blocking call is interrupted and any
694 695
    //  further attempt to use the socket will return ETERM. The user is still
    //  responsible for calling zmq_close on the socket though!
696
    ctx_terminated = true;
Martin Sustrik's avatar
Martin Sustrik committed
697 698
}

699 700
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
    const blob_t &peer_identity_)
Martin Sustrik's avatar
Martin Sustrik committed
701
{
702
    attach_pipes (in_pipe_, out_pipe_, peer_identity_);
Martin Sustrik's avatar
Martin Sustrik committed
703 704
}

705
void zmq::socket_base_t::process_unplug ()
706 707 708
{
}

709
void zmq::socket_base_t::process_term (int linger_)
710
{
711 712 713 714 715 716
    //  Unregister all inproc endpoints associated with this socket.
    //  Doing this we make sure that no new pipes from other sockets (inproc)
    //  will be initiated.
    unregister_endpoints (this);

    //  Continue the termination process immediately.
717
    own_t::process_term (linger_);
718 719
}

720 721 722 723 724
void zmq::socket_base_t::process_destroy ()
{
    destroyed = true;
}

725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
{
    errno = EINVAL;
    return -1;
}

bool zmq::socket_base_t::xhas_out ()
{
    return false;
}

int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_)
{
    errno = ENOTSUP;
    return -1;
}

bool zmq::socket_base_t::xhas_in ()
{
    return false;
}

int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
{
    errno = ENOTSUP;
    return -1;
}

754 755 756 757 758
void zmq::socket_base_t::in_event ()
{
    //  Process any commands from other threads/sockets that may be available
    //  at the moment. Ultimately, socket will be destroyed.
    process_commands (false, false);
759 760 761 762 763 764 765 766 767 768 769 770
    check_destroy ();
}

void zmq::socket_base_t::out_event ()
{
    zmq_assert (false);
}

void zmq::socket_base_t::timer_event (int id_)
{
    zmq_assert (false);
}
771

772 773
void zmq::socket_base_t::check_destroy ()
{
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
    //  If the object was already marked as destroyed, finish the deallocation.
    if (destroyed) {

        //  Remove the socket from the reaper's poller.
        poller->rm_fd (handle);

        //  Remove the socket from the context.
        destroy_socket (this);

        //  Notify the reaper about the fact.
        send_reaped ();

        //  Deallocate.
        own_t::process_destroy ();
    }
}