session_base.cpp 17.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "session_base.hpp"
31
#include "i_engine.hpp"
32
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
33
#include "pipe.hpp"
34
#include "likely.hpp"
35
#include "tcp_connecter.hpp"
36
#include "ipc_connecter.hpp"
37
#include "tipc_connecter.hpp"
38
#include "socks_connecter.hpp"
39 40
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
41
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
42
#include "norm_engine.hpp"
43

44
#include "ctx.hpp"
45 46 47
#include "req.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
48
    bool active_, class socket_base_t *socket_, const options_t &options_,
49
    address_t *addr_)
50 51 52 53
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
Martin Hurton's avatar
Martin Hurton committed
54
        s = new (std::nothrow) req_session_t (io_thread_, active_,
55
            socket_, options_, addr_);
56
        break;
57
    case ZMQ_DEALER:
58
    case ZMQ_REP:
59
    case ZMQ_ROUTER:
60 61 62 63 64 65 66
    case ZMQ_PUB:
    case ZMQ_XPUB:
    case ZMQ_SUB:
    case ZMQ_XSUB:
    case ZMQ_PUSH:
    case ZMQ_PULL:
    case ZMQ_PAIR:
67
    case ZMQ_STREAM:
68
    case ZMQ_SERVER:
69
    case ZMQ_CLIENT:
Martin Hurton's avatar
Martin Hurton committed
70
        s = new (std::nothrow) session_base_t (io_thread_, active_,
71 72
            socket_, options_, addr_);
        break;
73 74 75 76 77 78 79 80 81
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
82
      bool active_, class socket_base_t *socket_, const options_t &options_,
83
      address_t *addr_) :
84 85
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
Martin Hurton's avatar
Martin Hurton committed
86
    active (active_),
87
    pipe (NULL),
88
    zap_pipe (NULL),
89
    incomplete_in (false),
90
    pending (false),
91
    engine (NULL),
92 93
    socket (socket_),
    io_thread (io_thread_),
94
    has_linger_timer (false),
95
    addr (addr_)
96
{
97 98
}

99
zmq::session_base_t::~session_base_t ()
100
{
101
    zmq_assert (!pipe);
102
    zmq_assert (!zap_pipe);
103

104 105 106 107 108 109
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

110 111 112
    //  Close the engine.
    if (engine)
        engine->terminate ();
113

114
    delete addr;
115
}
116

117
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
118
{
119
    zmq_assert (!is_terminating ());
120 121 122 123
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
124 125
}

126
int zmq::session_base_t::pull_msg (msg_t *msg_)
127
{
128 129 130 131
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
132

133
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
134

135
    return 0;
136 137
}

138
int zmq::session_base_t::push_msg (msg_t *msg_)
139
{
140
    if (pipe && pipe->write (msg_)) {
141 142
        int rc = msg_->init ();
        errno_assert (rc == 0);
143
        return 0;
144 145
    }

146 147
    errno = EAGAIN;
    return -1;
148 149
}

150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    const bool ok = zap_pipe->write (msg_);
    zmq_assert (ok);

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
183 184 185 186
void zmq::session_base_t::reset ()
{
}

187
void zmq::session_base_t::flush ()
188
{
189 190
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
191 192
}

193
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
194
{
Martin Hurton's avatar
Martin Hurton committed
195
    zmq_assert (pipe != NULL);
196

Martin Hurton's avatar
Martin Hurton committed
197 198 199 200
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
    pipe->rollback ();
    pipe->flush ();
201

Martin Hurton's avatar
Martin Hurton committed
202 203 204 205 206 207 208 209 210
    //  Remove any half-read message from the in pipe.
    while (incomplete_in) {
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
211
    }
212 213
}

214
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
215
{
216
    // Drop the reference to the deallocated pipe if required.
217 218 219
    zmq_assert (pipe_ == pipe
             || pipe_ == zap_pipe
             || terminating_pipes.count (pipe_) == 1);
220

221
    if (pipe_ == pipe) {
222 223
        // If this is our current pipe, remove it
        pipe = NULL;
224 225 226 227
        if (has_linger_timer) {
            cancel_timer (linger_timer_id);
            has_linger_timer = false;
        }
228 229
    }
    else
Martin Hurton's avatar
Martin Hurton committed
230
    if (pipe_ == zap_pipe)
231
        zap_pipe = NULL;
232 233
    else
        // Remove the pipe from the detached pipes set
234
        terminating_pipes.erase (pipe_);
235

236
    if (!is_terminating () && options.raw_socket) {
Martin Hurton's avatar
Martin Hurton committed
237
        if (engine) {
238 239 240
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
241
        terminate ();
242 243
    }

Martin Hurton's avatar
Martin Hurton committed
244 245 246
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
Martin Hurton's avatar
Martin Hurton committed
247 248 249 250
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
        pending = false;
        own_t::process_term (0);
    }
251 252
}

253
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
254
{
255
    // Skip activating if we're detaching this pipe
Martin Hurton's avatar
Martin Hurton committed
256
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
257
        zmq_assert (terminating_pipes.count (pipe_) == 1);
258
        return;
259
    }
260

261 262 263 264 265 266
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
267
        engine->restart_output ();
268
    else
269
        engine->zap_msg_available ();
Martin Sustrik's avatar
Martin Sustrik committed
270 271
}

272
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
273
{
274
    // Skip activating if we're detaching this pipe
275 276
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
277
        return;
278
    }
279

Martin Hurton's avatar
Martin Hurton committed
280
    if (engine)
281
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
282 283
}

284
void zmq::session_base_t::hiccuped (pipe_t *)
285 286 287 288 289 290
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

291
zmq::socket_base_t *zmq::session_base_t::get_socket ()
292
{
293
    return socket;
294 295
}

296
void zmq::session_base_t::process_plug ()
297
{
Martin Hurton's avatar
Martin Hurton committed
298
    if (active)
299
        start_connecting (false);
300 301
}

302 303 304 305 306 307 308 309 310 311
int zmq::session_base_t::zap_connect ()
{
    zmq_assert (zap_pipe == NULL);

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
    if (peer.options.type != ZMQ_REP
312 313
    &&  peer.options.type != ZMQ_ROUTER
    &&  peer.options.type != ZMQ_SERVER) {
314 315 316 317 318 319 320 321 322
        errno = ECONNREFUSED;
        return -1;
    }

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
    object_t *parents [2] = {this, peer.socket};
    pipe_t *new_pipes [2] = {NULL, NULL};
    int hwms [2] = {0, 0};
323
    bool conflates [2] = {false, false};
Ian Barber's avatar
Ian Barber committed
324
    int rc = pipepair (parents, new_pipes, hwms, conflates);
325 326 327 328
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
    zap_pipe = new_pipes [0];
Ian Barber's avatar
Ian Barber committed
329
    zap_pipe->set_nodelay ();
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
    zap_pipe->set_event_sink (this);

    send_bind (peer.socket, new_pipes [1], false);

    //  Send empty identity if required by the peer.
    if (peer.options.recv_identity) {
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
        id.set_flags (msg_t::identity);
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
    }

    return 0;
}

Min RK's avatar
Min RK committed
348 349 350 351 352 353 354 355
bool zmq::session_base_t::zap_enabled ()
{
    return (
         options.mechanism != ZMQ_NULL ||
        (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
    );
}

356
void zmq::session_base_t::process_attach (i_engine *engine_)
357
{
Martin Hurton's avatar
Martin Hurton committed
358
    zmq_assert (engine_ != NULL);
359

360
    //  Create the pipe if it does not exist yet.
361
    if (!pipe && !is_terminating ()) {
362 363
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
364 365 366 367 368 369 370 371 372 373 374

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
375
        int rc = pipepair (parents, pipes, hwms, conflates);
376 377 378 379 380 381
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
382
        zmq_assert (!pipe);
383
        pipe = pipes [0];
384

385
        //  Ask socket to plug into the remote end of the pipe.
386
        send_bind (socket, pipes [1]);
387 388
    }

389
    //  Plug in the engine.
390
    zmq_assert (!engine);
391
    engine = engine_;
392 393 394
    engine->plug (io_thread, this);
}

395 396
void zmq::session_base_t::engine_error (
        zmq::stream_engine_t::error_reason_t reason)
397 398 399 400
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

401
    //  Remove any half-done messages from the pipes.
Martin Hurton's avatar
Martin Hurton committed
402 403
    if (pipe)
        clean_pipes ();
404

405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
    zmq_assert (reason == stream_engine_t::connection_error
             || reason == stream_engine_t::timeout_error
             || reason == stream_engine_t::protocol_error);

    switch (reason) {
        case stream_engine_t::timeout_error:
        case stream_engine_t::connection_error:
            if (active)
                reconnect ();
            else
                terminate ();
            break;
        case stream_engine_t::protocol_error:
            terminate ();
            break;
    }
421

422
    //  Just in case there's only a delimiter in the pipe.
423 424
    if (pipe)
        pipe->check_read ();
425 426 427

    if (zap_pipe)
        zap_pipe->check_read ();
428 429
}

430
void zmq::session_base_t::process_term (int linger_)
431
{
432 433 434 435
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
436
    //  standard termination immediately.
Martin Hurton's avatar
Martin Hurton committed
437 438
    if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
        own_t::process_term (0);
439 440 441 442 443
        return;
    }

    pending = true;

444 445 446 447 448 449 450 451 452 453 454 455 456
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
457

458 459 460 461 462
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
        pipe->check_read ();
    }
463

464 465
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
466 467
}

468
void zmq::session_base_t::timer_event (int id_)
469 470 471 472 473
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
474 475 476

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
477
    pipe->terminate (false);
478 479
}

Martin Hurton's avatar
Martin Hurton committed
480
void zmq::session_base_t::reconnect ()
481
{
482 483
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
Martin Hurton's avatar
Martin Hurton committed
484 485
    if (pipe && options.immediate == 1
        && addr->protocol != "pgm" && addr->protocol != "epgm"
bebopagogo's avatar
bebopagogo committed
486
        && addr->protocol != "norm") {
487 488
        pipe->hiccup ();
        pipe->terminate (false);
489
        terminating_pipes.insert (pipe);
490 491 492
        pipe = NULL;
    }

493
    reset ();
494

495
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
496 497
    if (options.reconnect_ivl != -1)
        start_connecting (true);
498

499 500 501
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
502
        pipe->hiccup ();
503 504
}

505
void zmq::session_base_t::start_connecting (bool wait_)
506
{
Martin Hurton's avatar
Martin Hurton committed
507
    zmq_assert (active);
508 509 510 511 512 513 514 515

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

516
    if (addr->protocol == "tcp") {
517
        if (!options.socks_proxy_address.empty()) {
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
            address_t *proxy_address = new (std::nothrow)
                address_t ("tcp", options.socks_proxy_address);
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
                new (std::nothrow) socks_connecter_t (
                    io_thread, this, options, addr, proxy_address, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
        else {
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
533 534 535
        return;
    }

536
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
537
    if (addr->protocol == "ipc") {
538
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
539
            io_thread, this, options, addr, wait_);
540 541 542 543
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
544
#endif
545
#if defined ZMQ_HAVE_TIPC
546 547 548 549
    if (addr->protocol == "tipc") {
        tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
            io_thread, this, options, addr, wait_);
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
550
        launch_child (connecter);
551 552 553 554
        return;
    }
#endif

555
#ifdef ZMQ_HAVE_OPENPGM
556

557
    //  Both PGM and EPGM transports are using the same infrastructure.
558
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
559

560 561 562
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

563
        //  For EPGM transport with UDP encapsulation of PGM is used.
564
        bool const udp_encapsulation = addr->protocol == "epgm";
565 566 567 568 569 570 571

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
572
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
573 574 575
                io_thread, options);
            alloc_assert (pgm_sender);

576
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
577
            errno_assert (rc == 0);
578 579 580

            send_attach (this, pgm_sender);
        }
581
        else {
582 583

            //  PGM receiver.
584
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
585 586 587
                io_thread, options);
            alloc_assert (pgm_receiver);

588
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
589
            errno_assert (rc == 0);
590 591 592 593 594 595 596

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
597

bebopagogo's avatar
bebopagogo committed
598
#ifdef ZMQ_HAVE_NORM
Martin Hurton's avatar
Martin Hurton committed
599
    if (addr->protocol == "norm") {
bebopagogo's avatar
bebopagogo committed
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  NORM sender.
            norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
            alloc_assert (norm_sender);

            int rc = norm_sender->init (addr->address.c_str (), true, false);
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
        }
        else {  // ZMQ_SUB or ZMQ_XSUB

            //  NORM receiver.
            norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
            alloc_assert (norm_receiver);

            int rc = norm_receiver->init (addr->address.c_str (), false, true);
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
628 629 630

    zmq_assert (false);
}
631