session_base.cpp 14.2 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "session_base.hpp"
21
#include "i_engine.hpp"
22
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
23
#include "pipe.hpp"
24
#include "likely.hpp"
25
#include "tcp_connecter.hpp"
26
#include "ipc_connecter.hpp"
27 28
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
29
#include "address.hpp"
30

31
#include "ctx.hpp"
32 33 34 35
#include "req.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
    bool connect_, class socket_base_t *socket_, const options_t &options_,
36
    const address_t *addr_)
37 38 39 40 41
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
        s = new (std::nothrow) req_session_t (io_thread_, connect_,
42
            socket_, options_, addr_);
43
        break;
44
    case ZMQ_DEALER:
45
    case ZMQ_REP:
46
    case ZMQ_ROUTER:
47 48 49 50 51 52 53
    case ZMQ_PUB:
    case ZMQ_XPUB:
    case ZMQ_SUB:
    case ZMQ_XSUB:
    case ZMQ_PUSH:
    case ZMQ_PULL:
    case ZMQ_PAIR:
54
    case ZMQ_STREAM:
55
        s = new (std::nothrow) session_base_t (io_thread_, connect_,
56 57
            socket_, options_, addr_);
        break;
58 59 60 61 62 63 64 65 66 67
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
      bool connect_, class socket_base_t *socket_, const options_t &options_,
68
      const address_t *addr_) :
69 70
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
71
    connect (connect_),
72
    pipe (NULL),
73
    zap_pipe (NULL),
74
    incomplete_in (false),
75
    pending (false),
76
    engine (NULL),
77 78
    socket (socket_),
    io_thread (io_thread_),
79
    has_linger_timer (false),
80
    addr (addr_)
81
{
82 83
}

84
zmq::session_base_t::~session_base_t ()
85
{
86
    zmq_assert (!pipe);
87
    zmq_assert (!zap_pipe);
88

89 90 91 92 93 94
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

95 96 97
    //  Close the engine.
    if (engine)
        engine->terminate ();
98

99
    delete addr;
100
}
101

102
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
103
{
104
    zmq_assert (!is_terminating ());
105 106 107 108
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
109 110
}

111
int zmq::session_base_t::pull_msg (msg_t *msg_)
112
{
113 114 115 116
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
117
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
118

119
    return 0;
120 121
}

122
int zmq::session_base_t::push_msg (msg_t *msg_)
123
{
124
    if (pipe && pipe->write (msg_)) {
125 126
        int rc = msg_->init ();
        errno_assert (rc == 0);
127
        return 0;
128 129
    }

130 131
    errno = EAGAIN;
    return -1;
132 133
}

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    const bool ok = zap_pipe->write (msg_);
    zmq_assert (ok);

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
167 168 169 170
void zmq::session_base_t::reset ()
{
}

171
void zmq::session_base_t::flush ()
172
{
173 174
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
175 176
}

177
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
178
{
179
    if (pipe) {
180

181 182 183 184 185 186
        //  Get rid of half-processed messages in the out pipe. Flush any
        //  unflushed messages upstream.
        pipe->rollback ();
        pipe->flush ();

        //  Remove any half-read message from the in pipe.
187
        while (incomplete_in) {
188 189 190
            msg_t msg;
            int rc = msg.init ();
            errno_assert (rc == 0);
191 192
            rc = pull_msg (&msg);
            errno_assert (rc == 0);
193 194
            rc = msg.close ();
            errno_assert (rc == 0);
195 196
        }
    }
197 198
}

199
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
200
{
201
    // Drop the reference to the deallocated pipe if required.
202 203 204
    zmq_assert (pipe_ == pipe
             || pipe_ == zap_pipe
             || terminating_pipes.count (pipe_) == 1);
205

206
    if (pipe_ == pipe)
207 208
        // If this is our current pipe, remove it
        pipe = NULL;
209 210 211 212
    else
    if (pipe_ == zap_pipe) {
        zap_pipe = NULL;
    }
213 214
    else
        // Remove the pipe from the detached pipes set
215
        terminating_pipes.erase (pipe_);
216

Martin Hurton's avatar
Martin Hurton committed
217 218
    if (!is_terminating () && options.raw_sock) {
        if (engine) {
219 220 221
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
222
        terminate ();
223 224
    }

Martin Hurton's avatar
Martin Hurton committed
225 226 227
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
228
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
229
        proceed_with_term ();
230 231
}

232
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
233
{
234
    // Skip activating if we're detaching this pipe
235
    if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
236
        zmq_assert (terminating_pipes.count (pipe_) == 1);
237
        return;
238
    }
239

240 241 242 243 244 245
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
246
        engine->restart_output ();
247
    else
248
        engine->zap_msg_available ();
Martin Sustrik's avatar
Martin Sustrik committed
249 250
}

251
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
252
{
253
    // Skip activating if we're detaching this pipe
254 255
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
256
        return;
257
    }
258

Martin Hurton's avatar
Martin Hurton committed
259
    if (engine)
260
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
261 262
}

263
void zmq::session_base_t::hiccuped (pipe_t *)
264 265 266 267 268 269
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

270
zmq::socket_base_t *zmq::session_base_t::get_socket ()
271
{
272
    return socket;
273 274
}

275
void zmq::session_base_t::process_plug ()
276
{
277 278
    if (connect)
        start_connecting (false);
279 280
}

281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
int zmq::session_base_t::zap_connect ()
{
    zmq_assert (zap_pipe == NULL);

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
    if (peer.options.type != ZMQ_REP
    &&  peer.options.type != ZMQ_ROUTER) {
        errno = ECONNREFUSED;
        return -1;
    }

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
    object_t *parents [2] = {this, peer.socket};
    pipe_t *new_pipes [2] = {NULL, NULL};
    int hwms [2] = {0, 0};
301
    bool conflates [2] = {false, false};
Ian Barber's avatar
Ian Barber committed
302
    int rc = pipepair (parents, new_pipes, hwms, conflates);
303 304 305 306
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
    zap_pipe = new_pipes [0];
Ian Barber's avatar
Ian Barber committed
307
    zap_pipe->set_nodelay ();
308 309
    zap_pipe->set_event_sink (this);

Ian Barber's avatar
Ian Barber committed
310
    new_pipes [1]->set_nodelay ();
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
    send_bind (peer.socket, new_pipes [1], false);

    //  Send empty identity if required by the peer.
    if (peer.options.recv_identity) {
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
        id.set_flags (msg_t::identity);
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
    }

    return 0;
}

327
void zmq::session_base_t::process_attach (i_engine *engine_)
328
{
Martin Hurton's avatar
Martin Hurton committed
329
    zmq_assert (engine_ != NULL);
330

331
    //  Create the pipe if it does not exist yet.
332
    if (!pipe && !is_terminating ()) {
333 334
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
335 336 337 338 339 340 341 342 343 344 345

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
346
        int rc = pipepair (parents, pipes, hwms, conflates);
347 348 349 350 351 352
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
353
        zmq_assert (!pipe);
354
        pipe = pipes [0];
355

356
        //  Ask socket to plug into the remote end of the pipe.
357
        send_bind (socket, pipes [1]);
358 359
    }

360
    //  Plug in the engine.
361
    zmq_assert (!engine);
362
    engine = engine_;
363 364 365
    engine->plug (io_thread, this);
}

366
void zmq::session_base_t::detach ()
367 368 369 370
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

371 372 373 374
    //  Remove any half-done messages from the pipes.
    clean_pipes ();

    //  Send the event to the derived class.
375
    detached ();
376

377
    //  Just in case there's only a delimiter in the pipe.
378 379
    if (pipe)
        pipe->check_read ();
380 381 382

    if (zap_pipe)
        zap_pipe->check_read ();
383 384
}

385
void zmq::session_base_t::process_term (int linger_)
386
{
387 388 389 390
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
391
    //  standard termination immediately.
392
    if (!pipe && !zap_pipe) {
393 394 395 396 397 398
        proceed_with_term ();
        return;
    }

    pending = true;

399 400 401 402 403 404 405 406 407 408 409 410 411
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
412

413 414 415 416 417
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
        pipe->check_read ();
    }
418

419 420
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
421 422
}

423
void zmq::session_base_t::proceed_with_term ()
424
{
Martin Hurton's avatar
Martin Hurton committed
425
    //  The pending phase has just ended.
426 427 428
    pending = false;

    //  Continue with standard termination.
429
    own_t::process_term (0);
430 431
}

432
void zmq::session_base_t::timer_event (int id_)
433
{
Sergey KHripchenko's avatar
Sergey KHripchenko committed
434

435 436 437 438
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
439 440 441

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
442
    pipe->terminate (false);
443 444
}

445
void zmq::session_base_t::detached ()
446
{
447 448
    //  Transient session self-destructs after peer disconnects.
    if (!connect) {
449 450 451 452
        terminate ();
        return;
    }

453 454
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
455
    if (pipe && options.immediate == 1
456 457 458
        && addr->protocol != "pgm" && addr->protocol != "epgm") {
        pipe->hiccup ();
        pipe->terminate (false);
459
        terminating_pipes.insert (pipe);
460 461 462
        pipe = NULL;
    }

463
    reset ();
464

465
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
466 467
    if (options.reconnect_ivl != -1)
        start_connecting (true);
468

469 470 471
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
472
        pipe->hiccup ();
473 474
}

475
void zmq::session_base_t::start_connecting (bool wait_)
476 477 478 479 480 481 482 483 484 485
{
    zmq_assert (connect);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

486
    if (addr->protocol == "tcp") {
487
        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
488
            io_thread, this, options, addr, wait_);
489 490 491 492 493
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }

494
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
495
    if (addr->protocol == "ipc") {
496
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
497
            io_thread, this, options, addr, wait_);
498 499 500 501
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
502
#endif
503

504
#ifdef ZMQ_HAVE_OPENPGM
505

506
    //  Both PGM and EPGM transports are using the same infrastructure.
507
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
508

509 510 511
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

512
        //  For EPGM transport with UDP encapsulation of PGM is used.
513
        bool const udp_encapsulation = addr->protocol == "epgm";
514 515 516 517 518 519 520

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
521
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
522 523 524
                io_thread, options);
            alloc_assert (pgm_sender);

525
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
526
            errno_assert (rc == 0);
527 528 529

            send_attach (this, pgm_sender);
        }
530
        else {
531 532

            //  PGM receiver.
533
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
534 535 536
                io_thread, options);
            alloc_assert (pgm_receiver);

537
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
538
            errno_assert (rc == 0);
539 540 541 542 543 544 545 546 547 548

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif

    zmq_assert (false);
}
549