session_base.cpp 17.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "macros.hpp"
31
#include "session_base.hpp"
32
#include "i_engine.hpp"
33
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
34
#include "pipe.hpp"
35
#include "likely.hpp"
36
#include "tcp_connecter.hpp"
37
#include "ipc_connecter.hpp"
38
#include "tipc_connecter.hpp"
39
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
40
#include "vmci_connecter.hpp"
41 42
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
43
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
44
#include "norm_engine.hpp"
45

46
#include "ctx.hpp"
47 48 49
#include "req.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
50
    bool active_, class socket_base_t *socket_, const options_t &options_,
51
    address_t *addr_)
52 53 54 55
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
Martin Hurton's avatar
Martin Hurton committed
56
        s = new (std::nothrow) req_session_t (io_thread_, active_,
57
            socket_, options_, addr_);
58
        break;
59
    case ZMQ_DEALER:
60
    case ZMQ_REP:
61
    case ZMQ_ROUTER:
62 63 64 65 66 67 68
    case ZMQ_PUB:
    case ZMQ_XPUB:
    case ZMQ_SUB:
    case ZMQ_XSUB:
    case ZMQ_PUSH:
    case ZMQ_PULL:
    case ZMQ_PAIR:
69
    case ZMQ_STREAM:
70
    case ZMQ_SERVER:
71
    case ZMQ_CLIENT:
Martin Hurton's avatar
Martin Hurton committed
72
        s = new (std::nothrow) session_base_t (io_thread_, active_,
73 74
            socket_, options_, addr_);
        break;
75 76 77 78 79 80 81 82 83
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
84
      bool active_, class socket_base_t *socket_, const options_t &options_,
85
      address_t *addr_) :
86 87
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
Martin Hurton's avatar
Martin Hurton committed
88
    active (active_),
89
    pipe (NULL),
90
    zap_pipe (NULL),
91
    incomplete_in (false),
92
    pending (false),
93
    engine (NULL),
94 95
    socket (socket_),
    io_thread (io_thread_),
96
    has_linger_timer (false),
97
    addr (addr_)
98
{
99 100
}

101
zmq::session_base_t::~session_base_t ()
102
{
103
    zmq_assert (!pipe);
104
    zmq_assert (!zap_pipe);
105

106 107 108 109 110 111
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

112 113 114
    //  Close the engine.
    if (engine)
        engine->terminate ();
115

116
    LIBZMQ_DELETE(addr);
117
}
118

119
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
120
{
121
    zmq_assert (!is_terminating ());
122 123 124 125
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
126 127
}

128
int zmq::session_base_t::pull_msg (msg_t *msg_)
129
{
130 131 132 133
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
134

135
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
136

137
    return 0;
138 139
}

140
int zmq::session_base_t::push_msg (msg_t *msg_)
141
{
Jonathan Reams's avatar
Jonathan Reams committed
142 143
    if(msg_->flags() & msg_t::command)
        return 0;
144
    if (pipe && pipe->write (msg_)) {
145 146
        int rc = msg_->init ();
        errno_assert (rc == 0);
147
        return 0;
148 149
    }

150 151
    errno = EAGAIN;
    return -1;
152 153
}

154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    const bool ok = zap_pipe->write (msg_);
    zmq_assert (ok);

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
187 188 189 190
void zmq::session_base_t::reset ()
{
}

191
void zmq::session_base_t::flush ()
192
{
193 194
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
195 196
}

197
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
198
{
Martin Hurton's avatar
Martin Hurton committed
199
    zmq_assert (pipe != NULL);
200

Martin Hurton's avatar
Martin Hurton committed
201 202 203 204
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
    pipe->rollback ();
    pipe->flush ();
205

Martin Hurton's avatar
Martin Hurton committed
206 207 208 209 210 211 212 213 214
    //  Remove any half-read message from the in pipe.
    while (incomplete_in) {
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
215
    }
216 217
}

218
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
219
{
220
    // Drop the reference to the deallocated pipe if required.
221 222 223
    zmq_assert (pipe_ == pipe
             || pipe_ == zap_pipe
             || terminating_pipes.count (pipe_) == 1);
224

225
    if (pipe_ == pipe) {
226 227
        // If this is our current pipe, remove it
        pipe = NULL;
228 229 230 231
        if (has_linger_timer) {
            cancel_timer (linger_timer_id);
            has_linger_timer = false;
        }
232 233
    }
    else
Martin Hurton's avatar
Martin Hurton committed
234
    if (pipe_ == zap_pipe)
235
        zap_pipe = NULL;
236 237
    else
        // Remove the pipe from the detached pipes set
238
        terminating_pipes.erase (pipe_);
239

240
    if (!is_terminating () && options.raw_socket) {
Martin Hurton's avatar
Martin Hurton committed
241
        if (engine) {
242 243 244
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
245
        terminate ();
246 247
    }

Martin Hurton's avatar
Martin Hurton committed
248 249 250
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
Martin Hurton's avatar
Martin Hurton committed
251 252 253 254
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
        pending = false;
        own_t::process_term (0);
    }
255 256
}

257
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
258
{
259
    // Skip activating if we're detaching this pipe
Martin Hurton's avatar
Martin Hurton committed
260
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
261
        zmq_assert (terminating_pipes.count (pipe_) == 1);
262
        return;
263
    }
264

265 266 267 268 269 270
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
271
        engine->restart_output ();
272
    else
273
        engine->zap_msg_available ();
Martin Sustrik's avatar
Martin Sustrik committed
274 275
}

276
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
277
{
278
    // Skip activating if we're detaching this pipe
279 280
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
281
        return;
282
    }
283

Martin Hurton's avatar
Martin Hurton committed
284
    if (engine)
285
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
286 287
}

288
void zmq::session_base_t::hiccuped (pipe_t *)
289 290 291 292 293 294
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

295
zmq::socket_base_t *zmq::session_base_t::get_socket ()
296
{
297
    return socket;
298 299
}

300
void zmq::session_base_t::process_plug ()
301
{
Martin Hurton's avatar
Martin Hurton committed
302
    if (active)
303
        start_connecting (false);
304 305
}

306 307 308 309 310 311 312 313 314 315
int zmq::session_base_t::zap_connect ()
{
    zmq_assert (zap_pipe == NULL);

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
    if (peer.options.type != ZMQ_REP
316 317
    &&  peer.options.type != ZMQ_ROUTER
    &&  peer.options.type != ZMQ_SERVER) {
318 319 320 321 322 323 324 325 326
        errno = ECONNREFUSED;
        return -1;
    }

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
    object_t *parents [2] = {this, peer.socket};
    pipe_t *new_pipes [2] = {NULL, NULL};
    int hwms [2] = {0, 0};
327
    bool conflates [2] = {false, false};
Ian Barber's avatar
Ian Barber committed
328
    int rc = pipepair (parents, new_pipes, hwms, conflates);
329 330 331 332
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
    zap_pipe = new_pipes [0];
Ian Barber's avatar
Ian Barber committed
333
    zap_pipe->set_nodelay ();
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    zap_pipe->set_event_sink (this);

    send_bind (peer.socket, new_pipes [1], false);

    //  Send empty identity if required by the peer.
    if (peer.options.recv_identity) {
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
        id.set_flags (msg_t::identity);
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
    }

    return 0;
}

Min RK's avatar
Min RK committed
352 353 354 355 356 357 358 359
bool zmq::session_base_t::zap_enabled ()
{
    return (
         options.mechanism != ZMQ_NULL ||
        (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
    );
}

360
void zmq::session_base_t::process_attach (i_engine *engine_)
361
{
Martin Hurton's avatar
Martin Hurton committed
362
    zmq_assert (engine_ != NULL);
363

364
    //  Create the pipe if it does not exist yet.
365
    if (!pipe && !is_terminating ()) {
366 367
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
368 369 370 371 372 373 374 375 376 377 378

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
379
        int rc = pipepair (parents, pipes, hwms, conflates);
380 381 382 383 384 385
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
386
        zmq_assert (!pipe);
387
        pipe = pipes [0];
388

389
        //  Ask socket to plug into the remote end of the pipe.
390
        send_bind (socket, pipes [1]);
391 392
    }

393
    //  Plug in the engine.
394
    zmq_assert (!engine);
395
    engine = engine_;
396 397 398
    engine->plug (io_thread, this);
}

399 400
void zmq::session_base_t::engine_error (
        zmq::stream_engine_t::error_reason_t reason)
401 402 403 404
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

405
    //  Remove any half-done messages from the pipes.
Martin Hurton's avatar
Martin Hurton committed
406 407
    if (pipe)
        clean_pipes ();
408

409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
    zmq_assert (reason == stream_engine_t::connection_error
             || reason == stream_engine_t::timeout_error
             || reason == stream_engine_t::protocol_error);

    switch (reason) {
        case stream_engine_t::timeout_error:
        case stream_engine_t::connection_error:
            if (active)
                reconnect ();
            else
                terminate ();
            break;
        case stream_engine_t::protocol_error:
            terminate ();
            break;
    }
425

426
    //  Just in case there's only a delimiter in the pipe.
427 428
    if (pipe)
        pipe->check_read ();
429 430 431

    if (zap_pipe)
        zap_pipe->check_read ();
432 433
}

434
void zmq::session_base_t::process_term (int linger_)
435
{
436 437 438 439
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
440
    //  standard termination immediately.
Martin Hurton's avatar
Martin Hurton committed
441 442
    if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
        own_t::process_term (0);
443 444 445 446 447
        return;
    }

    pending = true;

448 449 450 451 452 453 454 455 456 457 458 459 460
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
461

462 463 464
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
465 466
        if (!engine)
            pipe->check_read ();
467
    }
468

469 470
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
471 472
}

473
void zmq::session_base_t::timer_event (int id_)
474 475 476 477 478
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
479 480 481

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
482
    pipe->terminate (false);
483 484
}

Martin Hurton's avatar
Martin Hurton committed
485
void zmq::session_base_t::reconnect ()
486
{
487 488
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
Martin Hurton's avatar
Martin Hurton committed
489 490
    if (pipe && options.immediate == 1
        && addr->protocol != "pgm" && addr->protocol != "epgm"
bebopagogo's avatar
bebopagogo committed
491
        && addr->protocol != "norm") {
492 493
        pipe->hiccup ();
        pipe->terminate (false);
494
        terminating_pipes.insert (pipe);
495 496 497
        pipe = NULL;
    }

498
    reset ();
499

500
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
501 502
    if (options.reconnect_ivl != -1)
        start_connecting (true);
503

504 505 506
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
507
        pipe->hiccup ();
508 509
}

510
void zmq::session_base_t::start_connecting (bool wait_)
511
{
Martin Hurton's avatar
Martin Hurton committed
512
    zmq_assert (active);
513 514 515 516 517 518 519 520

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

521
    if (addr->protocol == "tcp") {
522
        if (!options.socks_proxy_address.empty()) {
523
            address_t *proxy_address = new (std::nothrow)
Ilya Kulakov's avatar
Ilya Kulakov committed
524
                address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
525 526 527 528 529 530 531 532 533 534 535 536 537
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
                new (std::nothrow) socks_connecter_t (
                    io_thread, this, options, addr, proxy_address, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
        else {
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
538 539 540
        return;
    }

541
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
542
    if (addr->protocol == "ipc") {
543
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
544
            io_thread, this, options, addr, wait_);
545 546 547 548
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
549
#endif
550
#if defined ZMQ_HAVE_TIPC
551 552 553 554
    if (addr->protocol == "tipc") {
        tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
            io_thread, this, options, addr, wait_);
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
555
        launch_child (connecter);
556 557 558 559
        return;
    }
#endif

560
#ifdef ZMQ_HAVE_OPENPGM
561

562
    //  Both PGM and EPGM transports are using the same infrastructure.
563
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
564

565 566 567
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

568
        //  For EPGM transport with UDP encapsulation of PGM is used.
569
        bool const udp_encapsulation = addr->protocol == "epgm";
570 571 572 573 574 575 576

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
577
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
578 579 580
                io_thread, options);
            alloc_assert (pgm_sender);

581
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
582
            errno_assert (rc == 0);
583 584 585

            send_attach (this, pgm_sender);
        }
586
        else {
587 588

            //  PGM receiver.
589
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
590 591 592
                io_thread, options);
            alloc_assert (pgm_receiver);

593
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
594
            errno_assert (rc == 0);
595 596 597 598 599 600 601

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
602

bebopagogo's avatar
bebopagogo committed
603
#ifdef ZMQ_HAVE_NORM
Martin Hurton's avatar
Martin Hurton committed
604
    if (addr->protocol == "norm") {
bebopagogo's avatar
bebopagogo committed
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  NORM sender.
            norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
            alloc_assert (norm_sender);

            int rc = norm_sender->init (addr->address.c_str (), true, false);
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
        }
        else {  // ZMQ_SUB or ZMQ_XSUB

            //  NORM receiver.
            norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
            alloc_assert (norm_receiver);

            int rc = norm_receiver->init (addr->address.c_str (), false, true);
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
633

Ilya Kulakov's avatar
Ilya Kulakov committed
634 635 636 637 638 639 640 641 642 643
#if defined ZMQ_HAVE_VMCI
    if (addr->protocol == "vmci") {
        vmci_connecter_t *connecter = new (std::nothrow) vmci_connecter_t (
                io_thread, this, options, addr, wait_);
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
#endif

644 645
    zmq_assert (false);
}
646