session_base.cpp 12.2 KB
Newer Older
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3
    Copyright (c) 2007-2011 iMatix Corporation
4
    Copyright (c) 2011 VMware, Inc.
5
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6 7 8 9

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
10
    the terms of the GNU Lesser General Public License as published by
11 12 13 14 15 16
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
    GNU Lesser General Public License for more details.
18

19
    You should have received a copy of the GNU Lesser General Public License
20 21 22
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

23
#include "session_base.hpp"
24
#include "socket_base.hpp"
25
#include "i_engine.hpp"
26
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
27
#include "pipe.hpp"
28
#include "likely.hpp"
29
#include "tcp_connecter.hpp"
30
#include "ipc_connecter.hpp"
31 32
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
33

34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
#include "req.hpp"
#include "xreq.hpp"
#include "rep.hpp"
#include "xrep.hpp"
#include "pub.hpp"
#include "xpub.hpp"
#include "sub.hpp"
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
    bool connect_, class socket_base_t *socket_, const options_t &options_,
    const char *protocol_, const char *address_)
{
    session_base_t *s = NULL;
    switch (options_.type) {
    case ZMQ_REQ:
        s = new (std::nothrow) req_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_XREQ:
        s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
    case ZMQ_REP:
        s = new (std::nothrow) rep_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_XREP:
        s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_PUB:
        s = new (std::nothrow) pub_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_XPUB:
        s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_SUB:
        s = new (std::nothrow) sub_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_XSUB:
        s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_PUSH:
        s = new (std::nothrow) push_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_PULL:
        s = new (std::nothrow) pull_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    case ZMQ_PAIR:
        s = new (std::nothrow) pair_session_t (io_thread_, connect_,
            socket_, options_, protocol_, address_);
        break;
    default:
        errno = EINVAL;
        return NULL;
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
      bool connect_, class socket_base_t *socket_, const options_t &options_,
105
      const char *protocol_, const char *address_) :
106 107
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
108
    connect (connect_),
109
    pipe (NULL),
110
    incomplete_in (false),
111
    pending (false),
112
    engine (NULL),
113 114
    socket (socket_),
    io_thread (io_thread_),
115 116
    has_linger_timer (false)
{
117 118 119 120
    if (protocol_)
        protocol = protocol_;
    if (address_)
        address = address_;
121 122
}

123
zmq::session_base_t::~session_base_t ()
124
{
125
    zmq_assert (!pipe);
126

127 128 129 130 131 132
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

133 134 135 136
    //  Close the engine.
    if (engine)
        engine->terminate ();
}
137

138
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
139
{
140
    zmq_assert (!is_terminating ());
141 142 143 144
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
145 146
}

147
int zmq::session_base_t::read (msg_t *msg_)
148
{
149 150 151 152
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
153
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
154

155
    return 0;
156 157
}

158
int zmq::session_base_t::write (msg_t *msg_)
159
{
160
    if (pipe && pipe->write (msg_)) {
161 162
        int rc = msg_->init ();
        errno_assert (rc == 0);
163
        return 0;
164 165
    }

166 167
    errno = EAGAIN;
    return -1;
168 169
}

170
void zmq::session_base_t::flush ()
171
{
172 173
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
174 175
}

176
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
177
{
178
    if (pipe) {
179

180 181 182 183 184 185
        //  Get rid of half-processed messages in the out pipe. Flush any
        //  unflushed messages upstream.
        pipe->rollback ();
        pipe->flush ();

        //  Remove any half-read message from the in pipe.
186
        while (incomplete_in) {
187 188 189
            msg_t msg;
            int rc = msg.init ();
            errno_assert (rc == 0);
190 191 192 193
            if (!read (&msg)) {
                zmq_assert (!incomplete_in);
                break;
            }
194 195
            rc = msg.close ();
            errno_assert (rc == 0);
196 197
        }
    }
198 199
}

200
void zmq::session_base_t::terminated (pipe_t *pipe_)
201
{
202
    //  Drop the reference to the deallocated pipe.
203 204
    zmq_assert (pipe == pipe_);
    pipe = NULL;
205

206 207 208 209 210
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
    if (pending)
        proceed_with_term ();
211 212
}

213
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
214
{
215
    zmq_assert (pipe == pipe_);
216

217
    if (likely (engine != NULL))
218
        engine->activate_out ();
219
    else
220
        pipe->check_read ();
Martin Sustrik's avatar
Martin Sustrik committed
221 222
}

223
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
224
{
225 226
    zmq_assert (pipe == pipe_);

Martin Hurton's avatar
Martin Hurton committed
227
    if (engine)
228
        engine->activate_in ();
Martin Hurton's avatar
Martin Hurton committed
229 230
}

231
void zmq::session_base_t::hiccuped (pipe_t *pipe_)
232 233 234 235 236 237
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

238
void zmq::session_base_t::process_plug ()
239
{
240 241
    if (connect)
        start_connecting (false);
242 243
}

244
void zmq::session_base_t::process_attach (i_engine *engine_)
245
{
246 247 248 249 250 251 252 253
    //  If some other object (e.g. init) notifies us that the connection failed
    //  without creating an engine we need to start the reconnection process.
    if (!engine_) {
        zmq_assert (!engine);
        detached ();
        return;
    }

254
    //  Create the pipe if it does not exist yet.
255
    if (!pipe && !is_terminating ()) {
256 257 258
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};
        int hwms [2] = {options.rcvhwm, options.sndhwm};
259
        bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
260 261 262 263 264 265 266
        int rc = pipepair (parents, pipes, hwms, delays);
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
267
        zmq_assert (!pipe);
268
        pipe = pipes [0];
269

270
        //  Ask socket to plug into the remote end of the pipe.
271
        send_bind (socket, pipes [1]);
272 273
    }

274
    //  Plug in the engine.
275
    zmq_assert (!engine);
276
    engine = engine_;
277 278 279
    engine->plug (io_thread, this);
}

280
void zmq::session_base_t::detach ()
281 282 283 284
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

285 286 287 288
    //  Remove any half-done messages from the pipes.
    clean_pipes ();

    //  Send the event to the derived class.
289
    detached ();
290

291
    //  Just in case there's only a delimiter in the pipe.
292 293
    if (pipe)
        pipe->check_read ();
294 295
}

296
void zmq::session_base_t::process_term (int linger_)
297
{
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
    //  stadard termination immediately.
    if (!pipe) {
        proceed_with_term ();
        return;
    }

    pending = true;

    //  If there's finite linger value, delay the termination.
    //  If linger is infinite (negative) we don't even have to set
    //  the timer.
    if (linger_ > 0) {
        zmq_assert (!has_linger_timer);
        add_timer (linger_, linger_timer_id);
        has_linger_timer = true;
317 318
    }

319 320 321 322 323
    //  Start pipe termination process. Delay the termination till all messages
    //  are processed in case the linger time is non-zero.
    pipe->terminate (linger_ != 0);

    //  TODO: Should this go into pipe_t::terminate ?
324 325 326 327 328
    //  In case there's no engine and there's only delimiter in the
    //  pipe it wouldn't be ever read. Thus we check for it explicitly.
    pipe->check_read ();
}

329
void zmq::session_base_t::proceed_with_term ()
330 331 332 333 334
{
    //  The pending phase have just ended.
    pending = false;

    //  Continue with standard termination.
335
    own_t::process_term (0);
336 337
}

338
void zmq::session_base_t::timer_event (int id_)
339 340 341 342 343
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
344 345 346

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
347
    pipe->terminate (false);
348 349
}

350
void zmq::session_base_t::detached ()
351
{
352 353
    //  Transient session self-destructs after peer disconnects.
    if (!connect) {
354 355 356 357
        terminate ();
        return;
    }

358 359 360
    //  Reconnect.
    start_connecting (true);

361 362 363 364 365 366
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
        pipe->hiccup ();  
}

367
void zmq::session_base_t::start_connecting (bool wait_)
368 369 370 371 372 373 374 375 376 377
{
    zmq_assert (connect);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

378
    if (protocol == "tcp") {
379
        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
380 381 382 383 384 385
            io_thread, this, options, address.c_str (), wait_);
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }

386
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
387 388 389
    if (protocol == "ipc") {
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
            io_thread, this, options, address.c_str (), wait_);
390 391 392 393
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
394
#endif
395 396

#if defined ZMQ_HAVE_OPENPGM
397

398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
    //  Both PGM and EPGM transports are using the same infrastructure.
    if (protocol == "pgm" || protocol == "epgm") {

        //  For EPGM transport with UDP encapsulation of PGM is used.
        bool udp_encapsulation = (protocol == "epgm");

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

            //  PGM sender.
            pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
                io_thread, options);
            alloc_assert (pgm_sender);

            int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
            zmq_assert (rc == 0);

            send_attach (this, pgm_sender);
        }
        else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {

            //  PGM receiver.
            pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
                io_thread, options);
            alloc_assert (pgm_receiver);

            int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
            zmq_assert (rc == 0);

            send_attach (this, pgm_receiver);
        }
        else
            zmq_assert (false);

        return;
    }
#endif

    zmq_assert (false);
}
440