session_base.cpp 23 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ipc_connecter.hpp"
39
#include "tipc_connecter.hpp"
40
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
41
#include "vmci_connecter.hpp"
42 43
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
44
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
45
#include "norm_engine.hpp"
46
#include "udp_engine.hpp"
47

48
#include "ctx.hpp"
49
#include "req.hpp"
50 51
#include "radio.hpp"
#include "dish.hpp"
52 53

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
54 55 56 57
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
58 59 60
{
    session_base_t *s = NULL;
    switch (options_.type) {
61 62 63
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
64
            break;
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
95 96 97 98 99 100
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
101 102 103 104
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
105 106
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
107 108 109 110 111 112 113 114 115 116
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
    _addr (addr_)
117
{
118 119
}

120
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
121
{
122
    return _engine->get_endpoint ();
123 124
}

125
zmq::session_base_t::~session_base_t ()
126
{
127 128
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
129

130
    //  If there's still a pending linger timer, remove it.
131
    if (_has_linger_timer) {
132
        cancel_timer (linger_timer_id);
133
        _has_linger_timer = false;
134 135
    }

136
    //  Close the engine.
137 138
    if (_engine)
        _engine->terminate ();
139

140
    LIBZMQ_DELETE (_addr);
141
}
142

143
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
144
{
145
    zmq_assert (!is_terminating ());
146
    zmq_assert (!_pipe);
147
    zmq_assert (pipe_);
148 149
    _pipe = pipe_;
    _pipe->set_event_sink (this);
150 151
}

152
int zmq::session_base_t::pull_msg (msg_t *msg_)
153
{
154
    if (!_pipe || !_pipe->read (msg_)) {
155 156 157
        errno = EAGAIN;
        return -1;
    }
158

159
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
160

161
    return 0;
162 163
}

164
int zmq::session_base_t::push_msg (msg_t *msg_)
165
{
166 167 168
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
169
        return 0;
170
    if (_pipe && _pipe->write (msg_)) {
171 172
        int rc = msg_->init ();
        errno_assert (rc == 0);
173
        return 0;
174 175
    }

176 177
    errno = EAGAIN;
    return -1;
178 179
}

180 181
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
182
    if (_zap_pipe == NULL) {
183 184 185 186
        errno = ENOTCONN;
        return -1;
    }

187
    if (!_zap_pipe->read (msg_)) {
188 189 190 191 192 193 194 195 196
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
197
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
198 199 200 201 202
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
203
        _zap_pipe->flush ();
204 205 206 207 208 209

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
210 211 212 213
void zmq::session_base_t::reset ()
{
}

214
void zmq::session_base_t::flush ()
215
{
216 217
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
218 219
}

220 221 222 223 224 225
void zmq::session_base_t::rollback ()
{
    if (_pipe)
        _pipe->rollback ();
}

226
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
227
{
228
    zmq_assert (_pipe != NULL);
229

Martin Hurton's avatar
Martin Hurton committed
230 231
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
232 233
    _pipe->rollback ();
    _pipe->flush ();
234

Martin Hurton's avatar
Martin Hurton committed
235
    //  Remove any half-read message from the in pipe.
236
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
237 238 239 240 241 242 243
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
244
    }
245 246
}

247
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
248
{
249
    // Drop the reference to the deallocated pipe if required.
250 251
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
252

253
    if (pipe_ == _pipe) {
254
        // If this is our current pipe, remove it
255 256
        _pipe = NULL;
        if (_has_linger_timer) {
257
            cancel_timer (linger_timer_id);
258
            _has_linger_timer = false;
259
        }
260 261
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
262 263
    else
        // Remove the pipe from the detached pipes set
264
        _terminating_pipes.erase (pipe_);
265

266
    if (!is_terminating () && options.raw_socket) {
267 268 269
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
270
        }
Martin Hurton's avatar
Martin Hurton committed
271
        terminate ();
272 273
    }

Martin Hurton's avatar
Martin Hurton committed
274 275 276
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
277 278
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
279 280
        own_t::process_term (0);
    }
281 282
}

283
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
284
{
285
    // Skip activating if we're detaching this pipe
286 287
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
288
        return;
289
    }
290

291 292
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
293 294 295
        return;
    }

296 297
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
298 299
    else {
        // i.e. pipe_ == zap_pipe
300
        _engine->zap_msg_available ();
301
    }
Martin Sustrik's avatar
Martin Sustrik committed
302 303
}

304
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
305
{
306
    // Skip activating if we're detaching this pipe
307 308
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
309
        return;
310
    }
311

312 313
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
314 315
}

316
void zmq::session_base_t::hiccuped (pipe_t *)
317 318 319 320 321 322
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

323
zmq::socket_base_t *zmq::session_base_t::get_socket ()
324
{
325
    return _socket;
326 327
}

328
void zmq::session_base_t::process_plug ()
329
{
330
    if (_active)
331
        start_connecting (false);
332 333
}

334 335 336 337 338 339
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
340 341
int zmq::session_base_t::zap_connect ()
{
342
    if (_zap_pipe != NULL)
343
        return 0;
344 345 346 347 348 349

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
350 351
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
352 353 354

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
355 356 357 358
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
359
    int rc = pipepair (parents, new_pipes, hwms, conflates);
360 361 362
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
363 364 365
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
366

367
    send_bind (peer.socket, new_pipes[1], false);
368

369 370
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
371 372 373
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
374
        id.set_flags (msg_t::routing_id);
375
        bool ok = _zap_pipe->write (&id);
376
        zmq_assert (ok);
377
        _zap_pipe->flush ();
378 379 380 381 382
    }

    return 0;
}

Min RK's avatar
Min RK committed
383 384
bool zmq::session_base_t::zap_enabled ()
{
385
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
386 387
}

388
void zmq::session_base_t::process_attach (i_engine *engine_)
389
{
Martin Hurton's avatar
Martin Hurton committed
390
    zmq_assert (engine_ != NULL);
391

392
    //  Create the pipe if it does not exist yet.
393 394
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
395 396
        pipe_t *pipes[2] = {NULL, NULL};

397
        const bool conflate = get_effective_conflate_option (options);
398 399 400 401

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
402
        int rc = pipepair (parents, pipes, hwms, conflates);
403 404 405
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
406
        pipes[0]->set_event_sink (this);
407 408

        //  Remember the local end of the pipe.
409 410
        zmq_assert (!_pipe);
        _pipe = pipes[0];
411

412 413 414 415 416
        //  The endpoints strings are not set on bind, set them here so that
        //  events can use them.
        pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
        pipes[1]->set_endpoint_pair (engine_->get_endpoint ());

417
        //  Ask socket to plug into the remote end of the pipe.
418
        send_bind (_socket, pipes[1]);
419 420
    }

421
    //  Plug in the engine.
422 423 424
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
425 426
}

427
void zmq::session_base_t::engine_error (
428
  zmq::stream_engine_t::error_reason_t reason_)
429 430
{
    //  Engine is dead. Let's forget about it.
431
    _engine = NULL;
432

433
    //  Remove any half-done messages from the pipes.
434
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
435
        clean_pipes ();
436

437 438 439
    zmq_assert (reason_ == stream_engine_t::connection_error
                || reason_ == stream_engine_t::timeout_error
                || reason_ == stream_engine_t::protocol_error);
440

441
    switch (reason_) {
442
        case stream_engine_t::timeout_error:
443
            /* FALLTHROUGH */
444
        case stream_engine_t::connection_error:
445
            if (_active) {
446
                reconnect ();
447 448
                break;
            }
449
            /* FALLTHROUGH */
450
        case stream_engine_t::protocol_error:
451 452 453 454 455
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
456 457 458
            } else {
                terminate ();
            }
459 460
            break;
    }
461

462
    //  Just in case there's only a delimiter in the pipe.
463 464
    if (_pipe)
        _pipe->check_read ();
465

466 467
    if (_zap_pipe)
        _zap_pipe->check_read ();
468 469
}

470
void zmq::session_base_t::process_term (int linger_)
471
{
472
    zmq_assert (!_pending);
473 474 475

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
476
    //  standard termination immediately.
477
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
478
        own_t::process_term (0);
479 480 481
        return;
    }

482
    _pending = true;
483

484
    if (_pipe != NULL) {
485 486 487 488
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
489
            zmq_assert (!_has_linger_timer);
490
            add_timer (linger_, linger_timer_id);
491
            _has_linger_timer = true;
492 493 494 495
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
496
        _pipe->terminate (linger_ != 0);
497

498 499 500
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
501 502
        if (!_engine)
            _pipe->check_read ();
503
    }
504

505 506
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
507 508
}

509
void zmq::session_base_t::timer_event (int id_)
510 511 512 513
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
514
    _has_linger_timer = false;
515 516

    //  Ask pipe to terminate even though there may be pending messages in it.
517 518
    zmq_assert (_pipe);
    _pipe->terminate (false);
519 520
}

Martin Hurton's avatar
Martin Hurton committed
521
void zmq::session_base_t::reconnect ()
522
{
523 524
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
525 526
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
527
        && _addr->protocol != protocol_name::udp) {
528 529 530 531 532 533
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
534
            cancel_timer (linger_timer_id);
535
            _has_linger_timer = false;
536
        }
537 538
    }

539
    reset ();
540

541
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
542 543
    if (options.reconnect_ivl != -1)
        start_connecting (true);
544 545
    else {
        std::string *ep = new (std::string);
546 547
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
548
    }
549

550 551
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
552
    if (_pipe
553 554
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
555
        _pipe->hiccup ();
556 557
}

558 559
zmq::session_base_t::connecter_factory_entry_t
  zmq::session_base_t::_connecter_factories[] = {
560 561
    connecter_factory_entry_t (protocol_name::tcp,
                               &zmq::session_base_t::create_connecter_tcp),
562 563
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
564 565
    connecter_factory_entry_t (protocol_name::ipc,
                               &zmq::session_base_t::create_connecter_ipc),
566 567
#endif
#if defined ZMQ_HAVE_TIPC
568 569
    connecter_factory_entry_t (protocol_name::tipc,
                               &zmq::session_base_t::create_connecter_tipc),
570 571
#endif
#if defined ZMQ_HAVE_VMCI
572 573
    connecter_factory_entry_t (protocol_name::vmci,
                               &zmq::session_base_t::create_connecter_vmci),
574 575 576 577 578 579 580 581 582 583 584
#endif
};

zmq::session_base_t::connecter_factory_map_t
  zmq::session_base_t::_connecter_factories_map (
    _connecter_factories,
    _connecter_factories
      + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));

zmq::session_base_t::start_connecting_entry_t
  zmq::session_base_t::_start_connecting_entries[] = {
585 586
    start_connecting_entry_t (protocol_name::udp,
                              &zmq::session_base_t::start_connecting_udp),
587
#if defined ZMQ_HAVE_OPENPGM
588 589 590 591
    start_connecting_entry_t ("pgm",
                              &zmq::session_base_t::start_connecting_pgm),
    start_connecting_entry_t ("epgm",
                              &zmq::session_base_t::start_connecting_pgm),
592 593
#endif
#if defined ZMQ_HAVE_NORM
594 595
    start_connecting_entry_t ("norm",
                              &zmq::session_base_t::start_connecting_norm),
596 597 598 599 600 601 602 603 604 605
#endif
};

zmq::session_base_t::start_connecting_map_t
  zmq::session_base_t::_start_connecting_map (
    _start_connecting_entries,
    _start_connecting_entries
      + sizeof (_start_connecting_entries)
          / sizeof (_start_connecting_entries[0]));

606
void zmq::session_base_t::start_connecting (bool wait_)
607
{
608
    zmq_assert (_active);
609 610 611 612 613 614 615

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.
616 617 618 619 620 621 622 623 624
    const connecter_factory_map_t::const_iterator connecter_factories_it =
      _connecter_factories_map.find (_addr->protocol);
    if (connecter_factories_it != _connecter_factories_map.end ()) {
        own_t *connecter =
          (this->*connecter_factories_it->second) (io_thread, wait_);

        alloc_assert (connecter);
        launch_child (connecter);
        return;
625
    }
626 627 628 629 630
    const start_connecting_map_t::const_iterator start_connecting_it =
      _start_connecting_map.find (_addr->protocol);
    if (start_connecting_it != _start_connecting_map.end ()) {
        (this->*start_connecting_it->second) (io_thread);
        return;
631
    }
632 633 634 635 636 637 638 639 640 641 642

    zmq_assert (false);
}

#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
643
#endif
644

645
#if defined ZMQ_HAVE_TIPC
646 647 648 649 650 651
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
652
#endif
653 654 655 656 657 658 659 660 661

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow)
      ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
662
#endif
663

664 665 666 667 668 669 670
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
                                                       bool wait_)
{
    if (!options.socks_proxy_address.empty ()) {
        address_t *proxy_address = new (std::nothrow) address_t (
          protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
        alloc_assert (proxy_address);
671
        socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t (
672
          io_thread_, this, options, _addr, proxy_address, wait_);
673 674 675 676 677 678
        alloc_assert (connecter);
        if (!options.socks_proxy_username.empty ()) {
            connecter->set_auth_method_basic (options.socks_proxy_username,
                                              options.socks_proxy_password);
        }
        return connecter;
679 680 681 682
    }
    return new (std::nothrow)
      tcp_connecter_t (io_thread_, this, options, _addr, wait_);
}
683

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
#ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
    zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

    //  For EPGM transport with UDP encapsulation of PGM is used.
    bool const udp_encapsulation = _addr->protocol == "epgm";

    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with PGM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  PGM sender.
        pgm_sender_t *pgm_sender =
          new (std::nothrow) pgm_sender_t (io_thread_, options);
        alloc_assert (pgm_sender);

        int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
703 704
        errno_assert (rc == 0);

705 706 707 708 709 710
        send_attach (this, pgm_sender);
    } else {
        //  PGM receiver.
        pgm_receiver_t *pgm_receiver =
          new (std::nothrow) pgm_receiver_t (io_thread_, options);
        alloc_assert (pgm_receiver);
711

712 713 714 715 716
        int rc =
          pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
        errno_assert (rc == 0);

        send_attach (this, pgm_receiver);
717
    }
718 719
}
#endif
720

721 722 723 724 725 726 727 728 729 730 731 732 733 734
#ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with NORM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  NORM sender.
        norm_engine_t *norm_sender =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_sender);

        int rc = norm_sender->init (_addr->address.c_str (), true, false);
        errno_assert (rc == 0);
735

736 737
        send_attach (this, norm_sender);
    } else { // ZMQ_SUB or ZMQ_XSUB
738

739 740 741 742 743 744 745 746 747
        //  NORM receiver.
        norm_engine_t *norm_receiver =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_receiver);

        int rc = norm_receiver->init (_addr->address.c_str (), false, true);
        errno_assert (rc == 0);

        send_attach (this, norm_receiver);
748
    }
749
}
750
#endif
Martin Hurton's avatar
Martin Hurton committed
751

752 753 754 755 756 757 758 759
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
{
    zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                || options.type == ZMQ_DGRAM);

    udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
    alloc_assert (engine);

760 761
    const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM;
    const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM;
762

763 764 765 766
    const int rc = engine->init (_addr, send, recv);
    errno_assert (rc == 0);

    send_attach (this, engine);
767
}