session_base.cpp 17.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "session_base.hpp"
31
#include "i_engine.hpp"
32
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
33
#include "pipe.hpp"
34
#include "likely.hpp"
35
#include "tcp_connecter.hpp"
36
#include "ipc_connecter.hpp"
37
#include "tipc_connecter.hpp"
38
#include "socks_connecter.hpp"
39 40
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
41
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
42
#include "norm_engine.hpp"
43

44
#include "ctx.hpp"
45 46 47
#include "req.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
48
    bool active_, class socket_base_t *socket_, const options_t &options_,
49
    address_t *addr_)
50 51 52 53
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
Martin Hurton's avatar
Martin Hurton committed
54
        s = new (std::nothrow) req_session_t (io_thread_, active_,
55
            socket_, options_, addr_);
56
        break;
57
    case ZMQ_DEALER:
58
    case ZMQ_REP:
59
    case ZMQ_ROUTER:
60 61 62 63 64 65 66
    case ZMQ_PUB:
    case ZMQ_XPUB:
    case ZMQ_SUB:
    case ZMQ_XSUB:
    case ZMQ_PUSH:
    case ZMQ_PULL:
    case ZMQ_PAIR:
67
    case ZMQ_STREAM:
68
    case ZMQ_SERVER:
69
    case ZMQ_CLIENT:
Martin Hurton's avatar
Martin Hurton committed
70
        s = new (std::nothrow) session_base_t (io_thread_, active_,
71 72
            socket_, options_, addr_);
        break;
73 74 75 76 77 78 79 80 81
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
82
      bool active_, class socket_base_t *socket_, const options_t &options_,
83
      address_t *addr_) :
84 85
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
Martin Hurton's avatar
Martin Hurton committed
86
    active (active_),
87
    pipe (NULL),
88
    zap_pipe (NULL),
89
    incomplete_in (false),
90
    pending (false),
91
    engine (NULL),
92 93
    socket (socket_),
    io_thread (io_thread_),
94
    has_linger_timer (false),
95
    addr (addr_)
96
{
97 98
}

99
zmq::session_base_t::~session_base_t ()
100
{
101
    zmq_assert (!pipe);
102
    zmq_assert (!zap_pipe);
103

104 105 106 107 108 109
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

110 111 112
    //  Close the engine.
    if (engine)
        engine->terminate ();
113

114
    delete addr;
115
}
116

117
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
118
{
119
    zmq_assert (!is_terminating ());
120 121 122 123
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
124 125
}

126
int zmq::session_base_t::pull_msg (msg_t *msg_)
127
{
128 129 130 131
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
132

133
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
134

135
    return 0;
136 137
}

138
int zmq::session_base_t::push_msg (msg_t *msg_)
139
{
Jonathan Reams's avatar
Jonathan Reams committed
140 141
    if(msg_->flags() & msg_t::command)
        return 0;
142
    if (pipe && pipe->write (msg_)) {
143 144
        int rc = msg_->init ();
        errno_assert (rc == 0);
145
        return 0;
146 147
    }

148 149
    errno = EAGAIN;
    return -1;
150 151
}

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    const bool ok = zap_pipe->write (msg_);
    zmq_assert (ok);

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
185 186 187 188
void zmq::session_base_t::reset ()
{
}

189
void zmq::session_base_t::flush ()
190
{
191 192
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
193 194
}

195
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
196
{
Martin Hurton's avatar
Martin Hurton committed
197
    zmq_assert (pipe != NULL);
198

Martin Hurton's avatar
Martin Hurton committed
199 200 201 202
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
    pipe->rollback ();
    pipe->flush ();
203

Martin Hurton's avatar
Martin Hurton committed
204 205 206 207 208 209 210 211 212
    //  Remove any half-read message from the in pipe.
    while (incomplete_in) {
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
213
    }
214 215
}

216
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
217
{
218
    // Drop the reference to the deallocated pipe if required.
219 220 221
    zmq_assert (pipe_ == pipe
             || pipe_ == zap_pipe
             || terminating_pipes.count (pipe_) == 1);
222

223
    if (pipe_ == pipe) {
224 225
        // If this is our current pipe, remove it
        pipe = NULL;
226 227 228 229
        if (has_linger_timer) {
            cancel_timer (linger_timer_id);
            has_linger_timer = false;
        }
230 231
    }
    else
Martin Hurton's avatar
Martin Hurton committed
232
    if (pipe_ == zap_pipe)
233
        zap_pipe = NULL;
234 235
    else
        // Remove the pipe from the detached pipes set
236
        terminating_pipes.erase (pipe_);
237

238
    if (!is_terminating () && options.raw_socket) {
Martin Hurton's avatar
Martin Hurton committed
239
        if (engine) {
240 241 242
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
243
        terminate ();
244 245
    }

Martin Hurton's avatar
Martin Hurton committed
246 247 248
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
Martin Hurton's avatar
Martin Hurton committed
249 250 251 252
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
        pending = false;
        own_t::process_term (0);
    }
253 254
}

255
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
256
{
257
    // Skip activating if we're detaching this pipe
Martin Hurton's avatar
Martin Hurton committed
258
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
259
        zmq_assert (terminating_pipes.count (pipe_) == 1);
260
        return;
261
    }
262

263 264 265 266 267 268
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
269
        engine->restart_output ();
270
    else
271
        engine->zap_msg_available ();
Martin Sustrik's avatar
Martin Sustrik committed
272 273
}

274
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
275
{
276
    // Skip activating if we're detaching this pipe
277 278
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
279
        return;
280
    }
281

Martin Hurton's avatar
Martin Hurton committed
282
    if (engine)
283
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
284 285
}

286
void zmq::session_base_t::hiccuped (pipe_t *)
287 288 289 290 291 292
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

293
zmq::socket_base_t *zmq::session_base_t::get_socket ()
294
{
295
    return socket;
296 297
}

298
void zmq::session_base_t::process_plug ()
299
{
Martin Hurton's avatar
Martin Hurton committed
300
    if (active)
301
        start_connecting (false);
302 303
}

304 305 306 307 308 309 310 311 312 313
int zmq::session_base_t::zap_connect ()
{
    zmq_assert (zap_pipe == NULL);

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
    if (peer.options.type != ZMQ_REP
314 315
    &&  peer.options.type != ZMQ_ROUTER
    &&  peer.options.type != ZMQ_SERVER) {
316 317 318 319 320 321 322 323 324
        errno = ECONNREFUSED;
        return -1;
    }

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
    object_t *parents [2] = {this, peer.socket};
    pipe_t *new_pipes [2] = {NULL, NULL};
    int hwms [2] = {0, 0};
325
    bool conflates [2] = {false, false};
Ian Barber's avatar
Ian Barber committed
326
    int rc = pipepair (parents, new_pipes, hwms, conflates);
327 328 329 330
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
    zap_pipe = new_pipes [0];
Ian Barber's avatar
Ian Barber committed
331
    zap_pipe->set_nodelay ();
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    zap_pipe->set_event_sink (this);

    send_bind (peer.socket, new_pipes [1], false);

    //  Send empty identity if required by the peer.
    if (peer.options.recv_identity) {
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
        id.set_flags (msg_t::identity);
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
    }

    return 0;
}

Min RK's avatar
Min RK committed
350 351 352 353 354 355 356 357
bool zmq::session_base_t::zap_enabled ()
{
    return (
         options.mechanism != ZMQ_NULL ||
        (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
    );
}

358
void zmq::session_base_t::process_attach (i_engine *engine_)
359
{
Martin Hurton's avatar
Martin Hurton committed
360
    zmq_assert (engine_ != NULL);
361

362
    //  Create the pipe if it does not exist yet.
363
    if (!pipe && !is_terminating ()) {
364 365
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
366 367 368 369 370 371 372 373 374 375 376

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
377
        int rc = pipepair (parents, pipes, hwms, conflates);
378 379 380 381 382 383
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
384
        zmq_assert (!pipe);
385
        pipe = pipes [0];
386

387
        //  Ask socket to plug into the remote end of the pipe.
388
        send_bind (socket, pipes [1]);
389 390
    }

391
    //  Plug in the engine.
392
    zmq_assert (!engine);
393
    engine = engine_;
394 395 396
    engine->plug (io_thread, this);
}

397 398
void zmq::session_base_t::engine_error (
        zmq::stream_engine_t::error_reason_t reason)
399 400 401 402
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

403
    //  Remove any half-done messages from the pipes.
Martin Hurton's avatar
Martin Hurton committed
404 405
    if (pipe)
        clean_pipes ();
406

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
    zmq_assert (reason == stream_engine_t::connection_error
             || reason == stream_engine_t::timeout_error
             || reason == stream_engine_t::protocol_error);

    switch (reason) {
        case stream_engine_t::timeout_error:
        case stream_engine_t::connection_error:
            if (active)
                reconnect ();
            else
                terminate ();
            break;
        case stream_engine_t::protocol_error:
            terminate ();
            break;
    }
423

424
    //  Just in case there's only a delimiter in the pipe.
425 426
    if (pipe)
        pipe->check_read ();
427 428 429

    if (zap_pipe)
        zap_pipe->check_read ();
430 431
}

432
void zmq::session_base_t::process_term (int linger_)
433
{
434 435 436 437
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
438
    //  standard termination immediately.
Martin Hurton's avatar
Martin Hurton committed
439 440
    if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
        own_t::process_term (0);
441 442 443 444 445
        return;
    }

    pending = true;

446 447 448 449 450 451 452 453 454 455 456 457 458
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
459

460 461 462 463 464
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
        pipe->check_read ();
    }
465

466 467
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
468 469
}

470
void zmq::session_base_t::timer_event (int id_)
471 472 473 474 475
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
476 477 478

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
479
    pipe->terminate (false);
480 481
}

Martin Hurton's avatar
Martin Hurton committed
482
void zmq::session_base_t::reconnect ()
483
{
484 485
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
Martin Hurton's avatar
Martin Hurton committed
486 487
    if (pipe && options.immediate == 1
        && addr->protocol != "pgm" && addr->protocol != "epgm"
bebopagogo's avatar
bebopagogo committed
488
        && addr->protocol != "norm") {
489 490
        pipe->hiccup ();
        pipe->terminate (false);
491
        terminating_pipes.insert (pipe);
492 493 494
        pipe = NULL;
    }

495
    reset ();
496

497
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
498 499
    if (options.reconnect_ivl != -1)
        start_connecting (true);
500

501 502 503
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
504
        pipe->hiccup ();
505 506
}

507
void zmq::session_base_t::start_connecting (bool wait_)
508
{
Martin Hurton's avatar
Martin Hurton committed
509
    zmq_assert (active);
510 511 512 513 514 515 516 517

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

518
    if (addr->protocol == "tcp") {
519
        if (!options.socks_proxy_address.empty()) {
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
            address_t *proxy_address = new (std::nothrow)
                address_t ("tcp", options.socks_proxy_address);
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
                new (std::nothrow) socks_connecter_t (
                    io_thread, this, options, addr, proxy_address, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
        else {
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
535 536 537
        return;
    }

538
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
539
    if (addr->protocol == "ipc") {
540
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
541
            io_thread, this, options, addr, wait_);
542 543 544 545
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
546
#endif
547
#if defined ZMQ_HAVE_TIPC
548 549 550 551
    if (addr->protocol == "tipc") {
        tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
            io_thread, this, options, addr, wait_);
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
552
        launch_child (connecter);
553 554 555 556
        return;
    }
#endif

557
#ifdef ZMQ_HAVE_OPENPGM
558

559
    //  Both PGM and EPGM transports are using the same infrastructure.
560
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
561

562 563 564
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

565
        //  For EPGM transport with UDP encapsulation of PGM is used.
566
        bool const udp_encapsulation = addr->protocol == "epgm";
567 568 569 570 571 572 573

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
574
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
575 576 577
                io_thread, options);
            alloc_assert (pgm_sender);

578
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
579
            errno_assert (rc == 0);
580 581 582

            send_attach (this, pgm_sender);
        }
583
        else {
584 585

            //  PGM receiver.
586
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
587 588 589
                io_thread, options);
            alloc_assert (pgm_receiver);

590
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
591
            errno_assert (rc == 0);
592 593 594 595 596 597 598

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
599

bebopagogo's avatar
bebopagogo committed
600
#ifdef ZMQ_HAVE_NORM
Martin Hurton's avatar
Martin Hurton committed
601
    if (addr->protocol == "norm") {
bebopagogo's avatar
bebopagogo committed
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  NORM sender.
            norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
            alloc_assert (norm_sender);

            int rc = norm_sender->init (addr->address.c_str (), true, false);
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
        }
        else {  // ZMQ_SUB or ZMQ_XSUB

            //  NORM receiver.
            norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
            alloc_assert (norm_receiver);

            int rc = norm_receiver->init (addr->address.c_str (), false, true);
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
630 631 632

    zmq_assert (false);
}
633