session_base.cpp 22.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ipc_connecter.hpp"
39
#include "tipc_connecter.hpp"
40
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
41
#include "vmci_connecter.hpp"
42 43
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
44
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
45
#include "norm_engine.hpp"
46
#include "udp_engine.hpp"
47

48
#include "ctx.hpp"
49
#include "req.hpp"
50 51
#include "radio.hpp"
#include "dish.hpp"
52 53

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
54 55 56 57
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
58 59 60
{
    session_base_t *s = NULL;
    switch (options_.type) {
61 62 63
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
64
            break;
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
95 96 97 98 99 100
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
101 102 103 104
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
105 106
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
107 108 109 110 111 112 113 114 115 116
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
    _addr (addr_)
117
{
118 119
}

120 121
const char *zmq::session_base_t::get_endpoint () const
{
122
    return _engine->get_endpoint ();
123 124
}

125
zmq::session_base_t::~session_base_t ()
126
{
127 128
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
129

130
    //  If there's still a pending linger timer, remove it.
131
    if (_has_linger_timer) {
132
        cancel_timer (linger_timer_id);
133
        _has_linger_timer = false;
134 135
    }

136
    //  Close the engine.
137 138
    if (_engine)
        _engine->terminate ();
139

140
    LIBZMQ_DELETE (_addr);
141
}
142

143
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
144
{
145
    zmq_assert (!is_terminating ());
146
    zmq_assert (!_pipe);
147
    zmq_assert (pipe_);
148 149
    _pipe = pipe_;
    _pipe->set_event_sink (this);
150 151
}

152
int zmq::session_base_t::pull_msg (msg_t *msg_)
153
{
154
    if (!_pipe || !_pipe->read (msg_)) {
155 156 157
        errno = EAGAIN;
        return -1;
    }
158

159
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
160

161
    return 0;
162 163
}

164
int zmq::session_base_t::push_msg (msg_t *msg_)
165
{
166 167 168
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
169
        return 0;
170
    if (_pipe && _pipe->write (msg_)) {
171 172
        int rc = msg_->init ();
        errno_assert (rc == 0);
173
        return 0;
174 175
    }

176 177
    errno = EAGAIN;
    return -1;
178 179
}

180 181
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
182
    if (_zap_pipe == NULL) {
183 184 185 186
        errno = ENOTCONN;
        return -1;
    }

187
    if (!_zap_pipe->read (msg_)) {
188 189 190 191 192 193 194 195 196
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
197
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
198 199 200 201 202
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
203
        _zap_pipe->flush ();
204 205 206 207 208 209

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
210 211 212 213
void zmq::session_base_t::reset ()
{
}

214
void zmq::session_base_t::flush ()
215
{
216 217
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
218 219
}

220 221 222 223 224 225
void zmq::session_base_t::rollback ()
{
    if (_pipe)
        _pipe->rollback ();
}

226
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
227
{
228
    zmq_assert (_pipe != NULL);
229

Martin Hurton's avatar
Martin Hurton committed
230 231
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
232 233
    _pipe->rollback ();
    _pipe->flush ();
234

Martin Hurton's avatar
Martin Hurton committed
235
    //  Remove any half-read message from the in pipe.
236
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
237 238 239 240 241 242 243
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
244
    }
245 246
}

247
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
248
{
249
    // Drop the reference to the deallocated pipe if required.
250 251
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
252

253
    if (pipe_ == _pipe) {
254
        // If this is our current pipe, remove it
255 256
        _pipe = NULL;
        if (_has_linger_timer) {
257
            cancel_timer (linger_timer_id);
258
            _has_linger_timer = false;
259
        }
260 261
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
262 263
    else
        // Remove the pipe from the detached pipes set
264
        _terminating_pipes.erase (pipe_);
265

266
    if (!is_terminating () && options.raw_socket) {
267 268 269
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
270
        }
Martin Hurton's avatar
Martin Hurton committed
271
        terminate ();
272 273
    }

Martin Hurton's avatar
Martin Hurton committed
274 275 276
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
277 278
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
279 280
        own_t::process_term (0);
    }
281 282
}

283
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
284
{
285
    // Skip activating if we're detaching this pipe
286 287
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
288
        return;
289
    }
290

291 292
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
293 294 295
        return;
    }

296 297
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
298 299
    else {
        // i.e. pipe_ == zap_pipe
300
        _engine->zap_msg_available ();
301
    }
Martin Sustrik's avatar
Martin Sustrik committed
302 303
}

304
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
305
{
306
    // Skip activating if we're detaching this pipe
307 308
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
309
        return;
310
    }
311

312 313
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
314 315
}

316
void zmq::session_base_t::hiccuped (pipe_t *)
317 318 319 320 321 322
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

323
zmq::socket_base_t *zmq::session_base_t::get_socket ()
324
{
325
    return _socket;
326 327
}

328
void zmq::session_base_t::process_plug ()
329
{
330
    if (_active)
331
        start_connecting (false);
332 333
}

334 335 336 337 338 339
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
340 341
int zmq::session_base_t::zap_connect ()
{
342
    if (_zap_pipe != NULL)
343
        return 0;
344 345 346 347 348 349

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
350 351
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
352 353 354

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
355 356 357 358
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
359
    int rc = pipepair (parents, new_pipes, hwms, conflates);
360 361 362
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
363 364 365
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
366

367
    send_bind (peer.socket, new_pipes[1], false);
368

369 370
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
371 372 373
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
374
        id.set_flags (msg_t::routing_id);
375
        bool ok = _zap_pipe->write (&id);
376
        zmq_assert (ok);
377
        _zap_pipe->flush ();
378 379 380 381 382
    }

    return 0;
}

Min RK's avatar
Min RK committed
383 384
bool zmq::session_base_t::zap_enabled ()
{
385
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
386 387
}

388
void zmq::session_base_t::process_attach (i_engine *engine_)
389
{
Martin Hurton's avatar
Martin Hurton committed
390
    zmq_assert (engine_ != NULL);
391

392
    //  Create the pipe if it does not exist yet.
393 394
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
395 396
        pipe_t *pipes[2] = {NULL, NULL};

397
        const bool conflate = get_effective_conflate_option (options);
398 399 400 401

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
402
        int rc = pipepair (parents, pipes, hwms, conflates);
403 404 405
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
406
        pipes[0]->set_event_sink (this);
407 408

        //  Remember the local end of the pipe.
409 410
        zmq_assert (!_pipe);
        _pipe = pipes[0];
411

412
        //  Ask socket to plug into the remote end of the pipe.
413
        send_bind (_socket, pipes[1]);
414 415
    }

416
    //  Plug in the engine.
417 418 419
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
420 421
}

422
void zmq::session_base_t::engine_error (
423
  zmq::stream_engine_t::error_reason_t reason_)
424 425
{
    //  Engine is dead. Let's forget about it.
426
    _engine = NULL;
427

428
    //  Remove any half-done messages from the pipes.
429
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
430
        clean_pipes ();
431

432 433 434
    zmq_assert (reason_ == stream_engine_t::connection_error
                || reason_ == stream_engine_t::timeout_error
                || reason_ == stream_engine_t::protocol_error);
435

436
    switch (reason_) {
437
        case stream_engine_t::timeout_error:
438
            /* FALLTHROUGH */
439
        case stream_engine_t::connection_error:
440
            if (_active) {
441
                reconnect ();
442 443
                break;
            }
444
            /* FALLTHROUGH */
445
        case stream_engine_t::protocol_error:
446 447 448 449 450
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
451 452 453
            } else {
                terminate ();
            }
454 455
            break;
    }
456

457
    //  Just in case there's only a delimiter in the pipe.
458 459
    if (_pipe)
        _pipe->check_read ();
460

461 462
    if (_zap_pipe)
        _zap_pipe->check_read ();
463 464
}

465
void zmq::session_base_t::process_term (int linger_)
466
{
467
    zmq_assert (!_pending);
468 469 470

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
471
    //  standard termination immediately.
472
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
473
        own_t::process_term (0);
474 475 476
        return;
    }

477
    _pending = true;
478

479
    if (_pipe != NULL) {
480 481 482 483
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
484
            zmq_assert (!_has_linger_timer);
485
            add_timer (linger_, linger_timer_id);
486
            _has_linger_timer = true;
487 488 489 490
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
491
        _pipe->terminate (linger_ != 0);
492

493 494 495
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
496 497
        if (!_engine)
            _pipe->check_read ();
498
    }
499

500 501
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
502 503
}

504
void zmq::session_base_t::timer_event (int id_)
505 506 507 508
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
509
    _has_linger_timer = false;
510 511

    //  Ask pipe to terminate even though there may be pending messages in it.
512 513
    zmq_assert (_pipe);
    _pipe->terminate (false);
514 515
}

Martin Hurton's avatar
Martin Hurton committed
516
void zmq::session_base_t::reconnect ()
517
{
518 519
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
520 521
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
522
        && _addr->protocol != protocol_name::udp) {
523 524 525 526 527 528
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
529
            cancel_timer (linger_timer_id);
530
            _has_linger_timer = false;
531
        }
532 533
    }

534
    reset ();
535

536
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
537 538
    if (options.reconnect_ivl != -1)
        start_connecting (true);
539 540
    else {
        std::string *ep = new (std::string);
541 542
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
543
    }
544

545 546
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
547
    if (_pipe
548 549
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
550
        _pipe->hiccup ();
551 552
}

553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
zmq::session_base_t::connecter_factory_entry_t
  zmq::session_base_t::_connecter_factories[] = {
    std::make_pair (protocol_name::tcp,
                    &zmq::session_base_t::create_connecter_tcp),
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
    std::make_pair (protocol_name::ipc,
                    &zmq::session_base_t::create_connecter_ipc),
#endif
#if defined ZMQ_HAVE_TIPC
    std::make_pair (protocol_name::tipc,
                    &zmq::session_base_t::create_connecter_tipc),
#endif
#if defined ZMQ_HAVE_VMCI
    std::make_pair (protocol_name::vmci,
                    &zmq::session_base_t::create_connecter_vmci),
#endif
};

zmq::session_base_t::connecter_factory_map_t
  zmq::session_base_t::_connecter_factories_map (
    _connecter_factories,
    _connecter_factories
      + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));

zmq::session_base_t::start_connecting_entry_t
  zmq::session_base_t::_start_connecting_entries[] = {
    std::make_pair (protocol_name::udp,
                    &zmq::session_base_t::start_connecting_udp),
#if defined ZMQ_HAVE_OPENPGM
    std::make_pair ("pgm", &zmq::session_base_t::start_connecting_pgm),
    std::make_pair ("epgm", &zmq::session_base_t::start_connecting_pgm),
#endif
#if defined ZMQ_HAVE_NORM
    std::make_pair ("norm", &zmq::session_base_t::start_connecting_norm),
#endif
};

zmq::session_base_t::start_connecting_map_t
  zmq::session_base_t::_start_connecting_map (
    _start_connecting_entries,
    _start_connecting_entries
      + sizeof (_start_connecting_entries)
          / sizeof (_start_connecting_entries[0]));

598
void zmq::session_base_t::start_connecting (bool wait_)
599
{
600
    zmq_assert (_active);
601 602 603 604 605 606 607

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.
608 609 610 611 612 613 614 615 616
    const connecter_factory_map_t::const_iterator connecter_factories_it =
      _connecter_factories_map.find (_addr->protocol);
    if (connecter_factories_it != _connecter_factories_map.end ()) {
        own_t *connecter =
          (this->*connecter_factories_it->second) (io_thread, wait_);

        alloc_assert (connecter);
        launch_child (connecter);
        return;
617
    }
618 619 620 621 622
    const start_connecting_map_t::const_iterator start_connecting_it =
      _start_connecting_map.find (_addr->protocol);
    if (start_connecting_it != _start_connecting_map.end ()) {
        (this->*start_connecting_it->second) (io_thread);
        return;
623
    }
624 625 626 627 628 629 630 631 632 633 634

    zmq_assert (false);
}

#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
635
#endif
636

637
#if defined ZMQ_HAVE_TIPC
638 639 640 641 642 643
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
644
#endif
645 646 647 648 649 650 651 652 653

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow)
      ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
654
#endif
655

656 657 658 659 660 661 662 663 664 665 666 667 668
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
                                                       bool wait_)
{
    if (!options.socks_proxy_address.empty ()) {
        address_t *proxy_address = new (std::nothrow) address_t (
          protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
        alloc_assert (proxy_address);
        return new (std::nothrow) socks_connecter_t (
          io_thread_, this, options, _addr, proxy_address, wait_);
    }
    return new (std::nothrow)
      tcp_connecter_t (io_thread_, this, options, _addr, wait_);
}
669

670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
#ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
    zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

    //  For EPGM transport with UDP encapsulation of PGM is used.
    bool const udp_encapsulation = _addr->protocol == "epgm";

    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with PGM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  PGM sender.
        pgm_sender_t *pgm_sender =
          new (std::nothrow) pgm_sender_t (io_thread_, options);
        alloc_assert (pgm_sender);

        int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
689
        errno_assert (rc == 0);
690

691 692 693 694 695 696
        send_attach (this, pgm_sender);
    } else {
        //  PGM receiver.
        pgm_receiver_t *pgm_receiver =
          new (std::nothrow) pgm_receiver_t (io_thread_, options);
        alloc_assert (pgm_receiver);
697

698 699 700 701 702
        int rc =
          pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
        errno_assert (rc == 0);

        send_attach (this, pgm_receiver);
703
    }
704 705
}
#endif
706

707 708 709 710 711 712 713 714 715 716 717 718 719 720
#ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with NORM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  NORM sender.
        norm_engine_t *norm_sender =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_sender);

        int rc = norm_sender->init (_addr->address.c_str (), true, false);
        errno_assert (rc == 0);
721

722 723
        send_attach (this, norm_sender);
    } else { // ZMQ_SUB or ZMQ_XSUB
724

725 726 727 728 729 730 731 732 733
        //  NORM receiver.
        norm_engine_t *norm_receiver =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_receiver);

        int rc = norm_receiver->init (_addr->address.c_str (), false, true);
        errno_assert (rc == 0);

        send_attach (this, norm_receiver);
734
    }
735
}
736
#endif
Martin Hurton's avatar
Martin Hurton committed
737

738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
{
    zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                || options.type == ZMQ_DGRAM);

    udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
    alloc_assert (engine);

    bool recv = false;
    bool send = false;

    if (options.type == ZMQ_RADIO) {
        send = true;
        recv = false;
    } else if (options.type == ZMQ_DISH) {
        send = false;
        recv = true;
    } else if (options.type == ZMQ_DGRAM) {
        send = true;
        recv = true;
bebopagogo's avatar
bebopagogo committed
758
    }
759

760 761 762 763
    const int rc = engine->init (_addr, send, recv);
    errno_assert (rc == 0);

    send_attach (this, engine);
764
}