session_base.cpp 15.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "session_base.hpp"
21
#include "i_engine.hpp"
22
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
23
#include "pipe.hpp"
24
#include "likely.hpp"
25
#include "tcp_connecter.hpp"
26
#include "ipc_connecter.hpp"
27
#include "tipc_connecter.hpp"
28 29
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
30
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
31
#include "norm_engine.hpp"
32

33
#include "ctx.hpp"
34 35 36
#include "req.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
37
    bool active_, class socket_base_t *socket_, const options_t &options_,
38
    address_t *addr_)
39
{
40
	
41 42 43
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
Martin Hurton's avatar
Martin Hurton committed
44
        s = new (std::nothrow) req_session_t (io_thread_, active_,
45
            socket_, options_, addr_);
46
        break;
47
    case ZMQ_DEALER:
48
    case ZMQ_REP:
49
    case ZMQ_ROUTER:
50 51 52 53 54 55 56
    case ZMQ_PUB:
    case ZMQ_XPUB:
    case ZMQ_SUB:
    case ZMQ_XSUB:
    case ZMQ_PUSH:
    case ZMQ_PULL:
    case ZMQ_PAIR:
57
    case ZMQ_STREAM:
Martin Hurton's avatar
Martin Hurton committed
58
        s = new (std::nothrow) session_base_t (io_thread_, active_,
59 60
            socket_, options_, addr_);
        break;
61 62 63 64 65 66 67 68 69
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
Martin Hurton's avatar
Martin Hurton committed
70
      bool active_, class socket_base_t *socket_, const options_t &options_,
71
      address_t *addr_) :
72 73
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
Martin Hurton's avatar
Martin Hurton committed
74
    active (active_),
75
    pipe (NULL),
76
    zap_pipe (NULL),
77
    incomplete_in (false),
78
    pending (false),
79
    engine (NULL),
80 81
    socket (socket_),
    io_thread (io_thread_),
82
    has_linger_timer (false),
83
    addr (addr_)
84
{
85 86
}

87
zmq::session_base_t::~session_base_t ()
88
{
89
    zmq_assert (!pipe);
90
    zmq_assert (!zap_pipe);
91

92 93 94 95 96 97
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

98 99 100
    //  Close the engine.
    if (engine)
        engine->terminate ();
101

102
    delete addr;
103
}
104

105
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
106
{
107
    zmq_assert (!is_terminating ());
108 109 110 111
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
112 113
}

114
int zmq::session_base_t::pull_msg (msg_t *msg_)
115
{
116 117 118 119
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
120

121
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
122

123
    return 0;
124 125
}

126
int zmq::session_base_t::push_msg (msg_t *msg_)
127
{
128
    if (pipe && pipe->write (msg_)) {
129 130
        int rc = msg_->init ();
        errno_assert (rc == 0);
131
        return 0;
132 133
    }

134 135
    errno = EAGAIN;
    return -1;
136 137
}

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    const bool ok = zap_pipe->write (msg_);
    zmq_assert (ok);

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
171 172 173 174
void zmq::session_base_t::reset ()
{
}

175
void zmq::session_base_t::flush ()
176
{
177 178
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
179 180
}

181
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
182
{
Martin Hurton's avatar
Martin Hurton committed
183
    zmq_assert (pipe != NULL);
184

Martin Hurton's avatar
Martin Hurton committed
185 186 187 188
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
    pipe->rollback ();
    pipe->flush ();
189

Martin Hurton's avatar
Martin Hurton committed
190 191 192 193 194 195 196 197 198
    //  Remove any half-read message from the in pipe.
    while (incomplete_in) {
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
199
    }
200 201
}

202
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
203
{
204
    // Drop the reference to the deallocated pipe if required.
205 206 207
    zmq_assert (pipe_ == pipe
             || pipe_ == zap_pipe
             || terminating_pipes.count (pipe_) == 1);
208

209
    if (pipe_ == pipe)
210 211
        // If this is our current pipe, remove it
        pipe = NULL;
212
    else
Martin Hurton's avatar
Martin Hurton committed
213
    if (pipe_ == zap_pipe)
214
        zap_pipe = NULL;
215 216
    else
        // Remove the pipe from the detached pipes set
217
        terminating_pipes.erase (pipe_);
218

Martin Hurton's avatar
Martin Hurton committed
219 220
    if (!is_terminating () && options.raw_sock) {
        if (engine) {
221 222 223
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
224
        terminate ();
225 226
    }

Martin Hurton's avatar
Martin Hurton committed
227 228 229
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
230
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
231
        proceed_with_term ();
232 233
}

234
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
235
{
236
    // Skip activating if we're detaching this pipe
237
    if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
238
        zmq_assert (terminating_pipes.count (pipe_) == 1);
239
        return;
240
    }
241

242 243 244 245 246 247
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
248
        engine->restart_output ();
249
    else
250
        engine->zap_msg_available ();
Martin Sustrik's avatar
Martin Sustrik committed
251 252
}

253
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
254
{
255
    // Skip activating if we're detaching this pipe
256 257
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
258
        return;
259
    }
260

Martin Hurton's avatar
Martin Hurton committed
261
    if (engine)
262
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
263 264
}

265
void zmq::session_base_t::hiccuped (pipe_t *)
266 267 268 269 270 271
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

272
zmq::socket_base_t *zmq::session_base_t::get_socket ()
273
{
274
    return socket;
275 276
}

277
void zmq::session_base_t::process_plug ()
278
{
Martin Hurton's avatar
Martin Hurton committed
279
    if (active)
280
        start_connecting (false);
281 282
}

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
int zmq::session_base_t::zap_connect ()
{
    zmq_assert (zap_pipe == NULL);

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
    if (peer.options.type != ZMQ_REP
    &&  peer.options.type != ZMQ_ROUTER) {
        errno = ECONNREFUSED;
        return -1;
    }

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
    object_t *parents [2] = {this, peer.socket};
    pipe_t *new_pipes [2] = {NULL, NULL};
    int hwms [2] = {0, 0};
303
    bool conflates [2] = {false, false};
Ian Barber's avatar
Ian Barber committed
304
    int rc = pipepair (parents, new_pipes, hwms, conflates);
305 306 307 308
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
    zap_pipe = new_pipes [0];
Ian Barber's avatar
Ian Barber committed
309
    zap_pipe->set_nodelay ();
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    zap_pipe->set_event_sink (this);

    send_bind (peer.socket, new_pipes [1], false);

    //  Send empty identity if required by the peer.
    if (peer.options.recv_identity) {
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
        id.set_flags (msg_t::identity);
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
    }

    return 0;
}

328
void zmq::session_base_t::process_attach (i_engine *engine_)
329
{
Martin Hurton's avatar
Martin Hurton committed
330
    zmq_assert (engine_ != NULL);
331

332
    //  Create the pipe if it does not exist yet.
333
    if (!pipe && !is_terminating ()) {
334 335
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
336 337 338 339 340 341 342 343 344 345 346

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
347
        int rc = pipepair (parents, pipes, hwms, conflates);
348 349 350 351 352 353
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
354
        zmq_assert (!pipe);
355
        pipe = pipes [0];
356 357
        // Store engine assoc_fd for lilnking pipe to fd 
        pipe->assoc_fd=engine_->get_assoc_fd();
Stoian Ivanov's avatar
Stoian Ivanov committed
358
        pipes[1]->assoc_fd=pipe->assoc_fd;
359
        //  Ask socket to plug into the remote end of the pipe.
360
        send_bind (socket, pipes [1]);
361 362
    }

363
    //  Plug in the engine.
364
    zmq_assert (!engine);
365
    engine = engine_;
366 367 368
    engine->plug (io_thread, this);
}

Martin Hurton's avatar
Martin Hurton committed
369
void zmq::session_base_t::engine_error ()
370 371 372 373
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

374
    //  Remove any half-done messages from the pipes.
Martin Hurton's avatar
Martin Hurton committed
375 376
    if (pipe)
        clean_pipes ();
377

Martin Hurton's avatar
Martin Hurton committed
378 379 380 381
    if (active)
        reconnect ();
    else
        terminate ();
382

383
    //  Just in case there's only a delimiter in the pipe.
384 385
    if (pipe)
        pipe->check_read ();
386 387 388

    if (zap_pipe)
        zap_pipe->check_read ();
389 390
}

391
void zmq::session_base_t::process_term (int linger_)
392
{
393 394 395 396
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
397
    //  standard termination immediately.
398
    if (!pipe && !zap_pipe) {
399 400 401 402 403 404
        proceed_with_term ();
        return;
    }

    pending = true;

405 406 407 408 409 410 411 412 413 414 415 416 417
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
418

419 420 421 422 423
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
        pipe->check_read ();
    }
424

425 426
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
427 428
}

429
void zmq::session_base_t::proceed_with_term ()
430
{
Martin Hurton's avatar
Martin Hurton committed
431
    //  The pending phase has just ended.
432 433 434
    pending = false;

    //  Continue with standard termination.
435
    own_t::process_term (0);
436 437
}

438
void zmq::session_base_t::timer_event (int id_)
439 440 441 442 443
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
444 445 446

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
447
    pipe->terminate (false);
448 449
}

Martin Hurton's avatar
Martin Hurton committed
450
void zmq::session_base_t::reconnect ()
451
{
452 453
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
Richard Newton's avatar
Richard Newton committed
454
    if (pipe && options.immediate == 1 
bebopagogo's avatar
bebopagogo committed
455 456
        && addr->protocol != "pgm" && addr->protocol != "epgm" 
        && addr->protocol != "norm") {
457 458
        pipe->hiccup ();
        pipe->terminate (false);
459
        terminating_pipes.insert (pipe);
460 461 462
        pipe = NULL;
    }

463
    reset ();
464

465
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
466 467
    if (options.reconnect_ivl != -1)
        start_connecting (true);
468

469 470 471
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
472
        pipe->hiccup ();
473 474
}

475
void zmq::session_base_t::start_connecting (bool wait_)
476
{
Martin Hurton's avatar
Martin Hurton committed
477
    zmq_assert (active);
478 479 480 481 482 483 484 485

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

486
    if (addr->protocol == "tcp") {
487
        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
488
            io_thread, this, options, addr, wait_);
489 490 491 492 493
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }

494
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
495
    if (addr->protocol == "ipc") {
496
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
497
            io_thread, this, options, addr, wait_);
498 499 500 501
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
502
#endif
503
#if defined ZMQ_HAVE_TIPC
504 505 506 507
    if (addr->protocol == "tipc") {
        tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
            io_thread, this, options, addr, wait_);
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
508
        launch_child (connecter);
509 510 511 512
        return;
    }
#endif

513
#ifdef ZMQ_HAVE_OPENPGM
514

515
    //  Both PGM and EPGM transports are using the same infrastructure.
516
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
517

518 519 520
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

521
        //  For EPGM transport with UDP encapsulation of PGM is used.
522
        bool const udp_encapsulation = addr->protocol == "epgm";
523 524 525 526 527 528 529

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
530
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
531 532 533
                io_thread, options);
            alloc_assert (pgm_sender);

534
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
535
            errno_assert (rc == 0);
536 537 538

            send_attach (this, pgm_sender);
        }
539
        else {
540 541

            //  PGM receiver.
542
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
543 544 545
                io_thread, options);
            alloc_assert (pgm_receiver);

546
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
547
            errno_assert (rc == 0);
548 549 550 551 552 553 554

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
bebopagogo's avatar
bebopagogo committed
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
    
#ifdef ZMQ_HAVE_NORM
    if (addr->protocol == "norm")
    {
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  NORM sender.
            norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
            alloc_assert (norm_sender);

            int rc = norm_sender->init (addr->address.c_str (), true, false);
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
        }
        else {  // ZMQ_SUB or ZMQ_XSUB

            //  NORM receiver.
            norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
            alloc_assert (norm_receiver);

            int rc = norm_receiver->init (addr->address.c_str (), false, true);
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
587 588 589

    zmq_assert (false);
}
590