session_base.cpp 12.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "session_base.hpp"
21
#include "i_engine.hpp"
22
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
23
#include "pipe.hpp"
24
#include "likely.hpp"
25
#include "tcp_connecter.hpp"
26
#include "ipc_connecter.hpp"
27 28
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
29
#include "address.hpp"
30

31
#include "req.hpp"
32
#include "dealer.hpp"
33
#include "rep.hpp"
34
#include "router.hpp"
35 36 37 38 39 40 41 42 43 44
#include "pub.hpp"
#include "xpub.hpp"
#include "sub.hpp"
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
    bool connect_, class socket_base_t *socket_, const options_t &options_,
45
    const address_t *addr_)
46 47 48 49 50
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
        s = new (std::nothrow) req_session_t (io_thread_, connect_,
51
            socket_, options_, addr_);
52
        break;
53 54
    case ZMQ_DEALER:
        s = new (std::nothrow) dealer_session_t (io_thread_, connect_,
55
            socket_, options_, addr_);
Martin Hurton's avatar
Martin Hurton committed
56
        break;
57 58
    case ZMQ_REP:
        s = new (std::nothrow) rep_session_t (io_thread_, connect_,
59
            socket_, options_, addr_);
60
        break;
61 62
    case ZMQ_ROUTER:
        s = new (std::nothrow) router_session_t (io_thread_, connect_,
63
            socket_, options_, addr_);
64 65 66
        break;
    case ZMQ_PUB:
        s = new (std::nothrow) pub_session_t (io_thread_, connect_,
67
            socket_, options_, addr_);
68 69 70
        break;
    case ZMQ_XPUB:
        s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
71
            socket_, options_, addr_);
72 73 74
        break;
    case ZMQ_SUB:
        s = new (std::nothrow) sub_session_t (io_thread_, connect_,
75
            socket_, options_, addr_);
76 77 78
        break;
    case ZMQ_XSUB:
        s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
79
            socket_, options_, addr_);
80 81 82
        break;
    case ZMQ_PUSH:
        s = new (std::nothrow) push_session_t (io_thread_, connect_,
83
            socket_, options_, addr_);
84 85 86
        break;
    case ZMQ_PULL:
        s = new (std::nothrow) pull_session_t (io_thread_, connect_,
87
            socket_, options_, addr_);
88 89 90
        break;
    case ZMQ_PAIR:
        s = new (std::nothrow) pair_session_t (io_thread_, connect_,
91
            socket_, options_, addr_);
92 93 94 95 96 97 98 99 100 101 102
        break;
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
      bool connect_, class socket_base_t *socket_, const options_t &options_,
103
      const address_t *addr_) :
104 105
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
106
    connect (connect_),
107
    pipe (NULL),
108
    incomplete_in (false),
109
    pending (false),
110
    engine (NULL),
111 112
    socket (socket_),
    io_thread (io_thread_),
113
    has_linger_timer (false),
114
    addr (addr_)
115
{
116 117
}

118
zmq::session_base_t::~session_base_t ()
119
{
120
    zmq_assert (!pipe);
121

122 123 124 125 126 127
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

128 129 130
    //  Close the engine.
    if (engine)
        engine->terminate ();
131 132 133

    if (addr)
        delete addr;
134
}
135

136
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
137
{
138
    zmq_assert (!is_terminating ());
139 140 141 142
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
143 144
}

145
int zmq::session_base_t::pull_msg (msg_t *msg_)
146
{
147 148 149 150
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
151
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
152

153
    return 0;
154 155
}

156
int zmq::session_base_t::push_msg (msg_t *msg_)
157
{
158
    if (pipe && pipe->write (msg_)) {
159 160
        int rc = msg_->init ();
        errno_assert (rc == 0);
161
        return 0;
162 163
    }

164 165
    errno = EAGAIN;
    return -1;
166 167
}

Martin Hurton's avatar
Martin Hurton committed
168 169 170 171
void zmq::session_base_t::reset ()
{
}

172
void zmq::session_base_t::flush ()
173
{
174 175
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
176 177
}

178
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
179
{
180
    if (pipe) {
181

182 183 184 185 186 187
        //  Get rid of half-processed messages in the out pipe. Flush any
        //  unflushed messages upstream.
        pipe->rollback ();
        pipe->flush ();

        //  Remove any half-read message from the in pipe.
188
        while (incomplete_in) {
189 190 191
            msg_t msg;
            int rc = msg.init ();
            errno_assert (rc == 0);
192 193
            rc = pull_msg (&msg);
            errno_assert (rc == 0);
194 195
            rc = msg.close ();
            errno_assert (rc == 0);
196 197
        }
    }
198 199
}

200
void zmq::session_base_t::terminated (pipe_t *pipe_)
201
{
202
    // Drop the reference to the deallocated pipe if required.
203
    zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1);
204 205 206 207 208 209

    if (pipe == pipe_)
        // If this is our current pipe, remove it
        pipe = NULL;
    else
        // Remove the pipe from the detached pipes set
210
        terminating_pipes.erase (pipe_);
211

Martin Hurton's avatar
Martin Hurton committed
212 213
    if (!is_terminating () && options.raw_sock) {
        if (engine) {
214 215 216
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
217
        terminate ();
218 219
    }

220 221 222
    // If we are waiting for pending messages to be sent, at this point
    // we are sure that there will be no more messages and we can proceed
    // with termination safely.
223
    if (pending && !pipe && terminating_pipes.empty ())
224
        proceed_with_term ();
225 226
}

227
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
228
{
229
    // Skip activating if we're detaching this pipe
230 231
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
232
        return;
233
    }
234

235
    if (likely (engine != NULL))
236
        engine->activate_out ();
237
    else
238
        pipe->check_read ();
Martin Sustrik's avatar
Martin Sustrik committed
239 240
}

241
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
242
{
243
    // Skip activating if we're detaching this pipe
244 245
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
246
        return;
247
    }
248

Martin Hurton's avatar
Martin Hurton committed
249
    if (engine)
250
        engine->activate_in ();
Martin Hurton's avatar
Martin Hurton committed
251 252
}

253
void zmq::session_base_t::hiccuped (pipe_t *)
254 255 256 257 258 259
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

260
zmq::socket_base_t *zmq::session_base_t::get_socket ()
261
{
262
    return socket;
263 264
}

265
void zmq::session_base_t::process_plug ()
266
{
267 268
    if (connect)
        start_connecting (false);
269 270
}

271
void zmq::session_base_t::process_attach (i_engine *engine_)
272
{
Martin Hurton's avatar
Martin Hurton committed
273
    zmq_assert (engine_ != NULL);
274

275
    //  Create the pipe if it does not exist yet.
276
    if (!pipe && !is_terminating ()) {
277 278 279
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
        int hwms [2] = {options.rcvhwm, options.sndhwm};
280
        bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
281 282 283 284 285 286 287
        int rc = pipepair (parents, pipes, hwms, delays);
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
288
        zmq_assert (!pipe);
289
        pipe = pipes [0];
290

291
        //  Ask socket to plug into the remote end of the pipe.
292
        send_bind (socket, pipes [1]);
293 294
    }

295
    //  Plug in the engine.
296
    zmq_assert (!engine);
297
    engine = engine_;
298 299 300
    engine->plug (io_thread, this);
}

301
void zmq::session_base_t::detach ()
302 303 304 305
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

306 307 308 309
    //  Remove any half-done messages from the pipes.
    clean_pipes ();

    //  Send the event to the derived class.
310
    detached ();
311

312
    //  Just in case there's only a delimiter in the pipe.
313 314
    if (pipe)
        pipe->check_read ();
315 316
}

317
void zmq::session_base_t::process_term (int linger_)
318
{
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
    //  stadard termination immediately.
    if (!pipe) {
        proceed_with_term ();
        return;
    }

    pending = true;

    //  If there's finite linger value, delay the termination.
    //  If linger is infinite (negative) we don't even have to set
    //  the timer.
    if (linger_ > 0) {
        zmq_assert (!has_linger_timer);
        add_timer (linger_, linger_timer_id);
        has_linger_timer = true;
338 339
    }

340 341 342 343 344
    //  Start pipe termination process. Delay the termination till all messages
    //  are processed in case the linger time is non-zero.
    pipe->terminate (linger_ != 0);

    //  TODO: Should this go into pipe_t::terminate ?
345 346 347 348 349
    //  In case there's no engine and there's only delimiter in the
    //  pipe it wouldn't be ever read. Thus we check for it explicitly.
    pipe->check_read ();
}

350
void zmq::session_base_t::proceed_with_term ()
351 352 353 354 355
{
    //  The pending phase have just ended.
    pending = false;

    //  Continue with standard termination.
356
    own_t::process_term (0);
357 358
}

359
void zmq::session_base_t::timer_event (int id_)
360
{
Sergey KHripchenko's avatar
Sergey KHripchenko committed
361

362 363 364 365
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
366 367 368

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
369
    pipe->terminate (false);
370 371
}

372
void zmq::session_base_t::detached ()
373
{
374 375
    //  Transient session self-destructs after peer disconnects.
    if (!connect) {
376 377 378 379
        terminate ();
        return;
    }

380 381
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
382
    if (pipe && options.immediate == 1
383 384 385
        && addr->protocol != "pgm" && addr->protocol != "epgm") {
        pipe->hiccup ();
        pipe->terminate (false);
386
        terminating_pipes.insert (pipe);
387 388 389
        pipe = NULL;
    }

390
    reset ();
391

392
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
393 394
    if (options.reconnect_ivl != -1)
        start_connecting (true);
395

396 397 398
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
399
        pipe->hiccup ();
400 401
}

402
void zmq::session_base_t::start_connecting (bool wait_)
403 404 405 406 407 408 409 410 411 412
{
    zmq_assert (connect);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

413
    if (addr->protocol == "tcp") {
414
        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
415
            io_thread, this, options, addr, wait_);
416 417 418 419 420
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }

421
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
422
    if (addr->protocol == "ipc") {
423
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
424
            io_thread, this, options, addr, wait_);
425 426 427 428
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
429
#endif
430

431
#ifdef ZMQ_HAVE_OPENPGM
432

433
    //  Both PGM and EPGM transports are using the same infrastructure.
434
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
435

436 437 438
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

439
        //  For EPGM transport with UDP encapsulation of PGM is used.
440
        bool const udp_encapsulation = addr->protocol == "epgm";
441 442 443 444 445 446 447

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
448
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
449 450 451
                io_thread, options);
            alloc_assert (pgm_sender);

452
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
453
            errno_assert (rc == 0);
454 455 456

            send_attach (this, pgm_sender);
        }
457
        else {
458 459

            //  PGM receiver.
460
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
461 462 463
                io_thread, options);
            alloc_assert (pgm_receiver);

464
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
465
            errno_assert (rc == 0);
466 467 468 469 470 471 472 473 474 475

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif

    zmq_assert (false);
}
476