session_base.cpp 23.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ws_connecter.hpp"
39
#include "ipc_connecter.hpp"
40
#include "tipc_connecter.hpp"
41
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
42
#include "vmci_connecter.hpp"
43 44
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
45
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
46
#include "norm_engine.hpp"
47
#include "udp_engine.hpp"
48

49
#include "ctx.hpp"
50
#include "req.hpp"
51 52
#include "radio.hpp"
#include "dish.hpp"
53 54

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55 56 57 58
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
59 60 61
{
    session_base_t *s = NULL;
    switch (options_.type) {
62 63 64
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
65
            break;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
96 97 98 99 100 101
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
102 103 104 105
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
106 107
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
108 109 110 111 112 113 114 115 116 117
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
    _addr (addr_)
118
{
119 120
}

121
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
122
{
123
    return _engine->get_endpoint ();
124 125
}

126
zmq::session_base_t::~session_base_t ()
127
{
128 129
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
130

131
    //  If there's still a pending linger timer, remove it.
132
    if (_has_linger_timer) {
133
        cancel_timer (linger_timer_id);
134
        _has_linger_timer = false;
135 136
    }

137
    //  Close the engine.
138 139
    if (_engine)
        _engine->terminate ();
140

141
    LIBZMQ_DELETE (_addr);
142
}
143

144
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
145
{
146
    zmq_assert (!is_terminating ());
147
    zmq_assert (!_pipe);
148
    zmq_assert (pipe_);
149 150
    _pipe = pipe_;
    _pipe->set_event_sink (this);
151 152
}

153
int zmq::session_base_t::pull_msg (msg_t *msg_)
154
{
155
    if (!_pipe || !_pipe->read (msg_)) {
156 157 158
        errno = EAGAIN;
        return -1;
    }
159

160
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
161

162
    return 0;
163 164
}

165
int zmq::session_base_t::push_msg (msg_t *msg_)
166
{
167 168 169
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
170
        return 0;
171
    if (_pipe && _pipe->write (msg_)) {
172 173
        int rc = msg_->init ();
        errno_assert (rc == 0);
174
        return 0;
175 176
    }

177 178
    errno = EAGAIN;
    return -1;
179 180
}

181 182
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
183
    if (_zap_pipe == NULL) {
184 185 186 187
        errno = ENOTCONN;
        return -1;
    }

188
    if (!_zap_pipe->read (msg_)) {
189 190 191 192 193 194 195 196 197
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
198
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
199 200 201 202 203
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
204
        _zap_pipe->flush ();
205 206 207 208 209 210

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
211 212 213 214
void zmq::session_base_t::reset ()
{
}

215
void zmq::session_base_t::flush ()
216
{
217 218
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
219 220
}

221 222 223 224 225 226
void zmq::session_base_t::rollback ()
{
    if (_pipe)
        _pipe->rollback ();
}

227
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
228
{
229
    zmq_assert (_pipe != NULL);
230

Martin Hurton's avatar
Martin Hurton committed
231 232
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
233 234
    _pipe->rollback ();
    _pipe->flush ();
235

Martin Hurton's avatar
Martin Hurton committed
236
    //  Remove any half-read message from the in pipe.
237
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
238 239 240 241 242 243 244
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
245
    }
246 247
}

248
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
249
{
250
    // Drop the reference to the deallocated pipe if required.
251 252
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
253

254
    if (pipe_ == _pipe) {
255
        // If this is our current pipe, remove it
256 257
        _pipe = NULL;
        if (_has_linger_timer) {
258
            cancel_timer (linger_timer_id);
259
            _has_linger_timer = false;
260
        }
261 262
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
263 264
    else
        // Remove the pipe from the detached pipes set
265
        _terminating_pipes.erase (pipe_);
266

267
    if (!is_terminating () && options.raw_socket) {
268 269 270
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
271
        }
Martin Hurton's avatar
Martin Hurton committed
272
        terminate ();
273 274
    }

Martin Hurton's avatar
Martin Hurton committed
275 276 277
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
278 279
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
280 281
        own_t::process_term (0);
    }
282 283
}

284
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
285
{
286
    // Skip activating if we're detaching this pipe
287 288
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
289
        return;
290
    }
291

292 293
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
294 295 296
        return;
    }

297 298
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
299 300
    else {
        // i.e. pipe_ == zap_pipe
301
        _engine->zap_msg_available ();
302
    }
Martin Sustrik's avatar
Martin Sustrik committed
303 304
}

305
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
306
{
307
    // Skip activating if we're detaching this pipe
308 309
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
310
        return;
311
    }
312

313 314
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
315 316
}

317
void zmq::session_base_t::hiccuped (pipe_t *)
318 319 320 321 322 323
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

324
zmq::socket_base_t *zmq::session_base_t::get_socket ()
325
{
326
    return _socket;
327 328
}

329
void zmq::session_base_t::process_plug ()
330
{
331
    if (_active)
332
        start_connecting (false);
333 334
}

335 336 337 338 339 340
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
341 342
int zmq::session_base_t::zap_connect ()
{
343
    if (_zap_pipe != NULL)
344
        return 0;
345 346 347 348 349 350

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
351 352
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
353 354 355

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
356 357 358 359
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
360
    int rc = pipepair (parents, new_pipes, hwms, conflates);
361 362 363
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
364 365 366
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
367

368
    send_bind (peer.socket, new_pipes[1], false);
369

370 371
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
372 373 374
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
375
        id.set_flags (msg_t::routing_id);
376
        bool ok = _zap_pipe->write (&id);
377
        zmq_assert (ok);
378
        _zap_pipe->flush ();
379 380 381 382 383
    }

    return 0;
}

Min RK's avatar
Min RK committed
384 385
bool zmq::session_base_t::zap_enabled ()
{
386
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
387 388
}

389
void zmq::session_base_t::process_attach (i_engine *engine_)
390
{
Martin Hurton's avatar
Martin Hurton committed
391
    zmq_assert (engine_ != NULL);
392

393
    //  Create the pipe if it does not exist yet.
394 395
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
396 397
        pipe_t *pipes[2] = {NULL, NULL};

398
        const bool conflate = get_effective_conflate_option (options);
399 400 401 402

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
403
        int rc = pipepair (parents, pipes, hwms, conflates);
404 405 406
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
407
        pipes[0]->set_event_sink (this);
408 409

        //  Remember the local end of the pipe.
410 411
        zmq_assert (!_pipe);
        _pipe = pipes[0];
412

413 414 415 416 417
        //  The endpoints strings are not set on bind, set them here so that
        //  events can use them.
        pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
        pipes[1]->set_endpoint_pair (engine_->get_endpoint ());

418
        //  Ask socket to plug into the remote end of the pipe.
419
        send_bind (_socket, pipes[1]);
420 421
    }

422
    //  Plug in the engine.
423 424 425
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
426 427
}

428
void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
429 430
{
    //  Engine is dead. Let's forget about it.
431
    _engine = NULL;
432

433
    //  Remove any half-done messages from the pipes.
434
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
435
        clean_pipes ();
436

437 438 439
    zmq_assert (reason_ == i_engine::connection_error
                || reason_ == i_engine::timeout_error
                || reason_ == i_engine::protocol_error);
440

441
    switch (reason_) {
442
        case i_engine::timeout_error:
443
            /* FALLTHROUGH */
444
        case i_engine::connection_error:
445
            if (_active) {
446
                reconnect ();
447 448
                break;
            }
449
            /* FALLTHROUGH */
450
        case i_engine::protocol_error:
451 452 453 454 455
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
456 457 458
            } else {
                terminate ();
            }
459 460
            break;
    }
461

462
    //  Just in case there's only a delimiter in the pipe.
463 464
    if (_pipe)
        _pipe->check_read ();
465

466 467
    if (_zap_pipe)
        _zap_pipe->check_read ();
468 469
}

470
void zmq::session_base_t::process_term (int linger_)
471
{
472
    zmq_assert (!_pending);
473 474 475

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
476
    //  standard termination immediately.
477
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
478
        own_t::process_term (0);
479 480 481
        return;
    }

482
    _pending = true;
483

484
    if (_pipe != NULL) {
485 486 487 488
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
489
            zmq_assert (!_has_linger_timer);
490
            add_timer (linger_, linger_timer_id);
491
            _has_linger_timer = true;
492 493 494 495
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
496
        _pipe->terminate (linger_ != 0);
497

498 499 500
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
501 502
        if (!_engine)
            _pipe->check_read ();
503
    }
504

505 506
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
507 508
}

509
void zmq::session_base_t::timer_event (int id_)
510 511 512 513
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
514
    _has_linger_timer = false;
515 516

    //  Ask pipe to terminate even though there may be pending messages in it.
517 518
    zmq_assert (_pipe);
    _pipe->terminate (false);
519 520
}

Martin Hurton's avatar
Martin Hurton committed
521
void zmq::session_base_t::reconnect ()
522
{
523 524
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
525 526
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
527
        && _addr->protocol != protocol_name::udp) {
528 529 530 531 532 533
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
534
            cancel_timer (linger_timer_id);
535
            _has_linger_timer = false;
536
        }
537 538
    }

539
    reset ();
540

541
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
542 543
    if (options.reconnect_ivl != -1)
        start_connecting (true);
544 545
    else {
        std::string *ep = new (std::string);
546 547
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
548
    }
549

550 551
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
552
    if (_pipe
553 554
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
555
        _pipe->hiccup ();
556 557
}

558 559
zmq::session_base_t::connecter_factory_entry_t
  zmq::session_base_t::_connecter_factories[] = {
560 561
    connecter_factory_entry_t (protocol_name::tcp,
                               &zmq::session_base_t::create_connecter_tcp),
562 563
    connecter_factory_entry_t (protocol_name::ws,
                               &zmq::session_base_t::create_connecter_ws),
564 565
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
566 567
    connecter_factory_entry_t (protocol_name::ipc,
                               &zmq::session_base_t::create_connecter_ipc),
568 569
#endif
#if defined ZMQ_HAVE_TIPC
570 571
    connecter_factory_entry_t (protocol_name::tipc,
                               &zmq::session_base_t::create_connecter_tipc),
572 573
#endif
#if defined ZMQ_HAVE_VMCI
574 575
    connecter_factory_entry_t (protocol_name::vmci,
                               &zmq::session_base_t::create_connecter_vmci),
576 577 578 579 580 581 582 583 584 585 586
#endif
};

zmq::session_base_t::connecter_factory_map_t
  zmq::session_base_t::_connecter_factories_map (
    _connecter_factories,
    _connecter_factories
      + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));

zmq::session_base_t::start_connecting_entry_t
  zmq::session_base_t::_start_connecting_entries[] = {
587 588
    start_connecting_entry_t (protocol_name::udp,
                              &zmq::session_base_t::start_connecting_udp),
589
#if defined ZMQ_HAVE_OPENPGM
590 591 592 593
    start_connecting_entry_t ("pgm",
                              &zmq::session_base_t::start_connecting_pgm),
    start_connecting_entry_t ("epgm",
                              &zmq::session_base_t::start_connecting_pgm),
594 595
#endif
#if defined ZMQ_HAVE_NORM
596 597
    start_connecting_entry_t ("norm",
                              &zmq::session_base_t::start_connecting_norm),
598 599 600 601 602 603 604 605 606 607
#endif
};

zmq::session_base_t::start_connecting_map_t
  zmq::session_base_t::_start_connecting_map (
    _start_connecting_entries,
    _start_connecting_entries
      + sizeof (_start_connecting_entries)
          / sizeof (_start_connecting_entries[0]));

608
void zmq::session_base_t::start_connecting (bool wait_)
609
{
610
    zmq_assert (_active);
611 612 613 614 615 616 617

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.
618 619 620 621 622 623 624 625 626
    const connecter_factory_map_t::const_iterator connecter_factories_it =
      _connecter_factories_map.find (_addr->protocol);
    if (connecter_factories_it != _connecter_factories_map.end ()) {
        own_t *connecter =
          (this->*connecter_factories_it->second) (io_thread, wait_);

        alloc_assert (connecter);
        launch_child (connecter);
        return;
627
    }
628 629 630 631 632
    const start_connecting_map_t::const_iterator start_connecting_it =
      _start_connecting_map.find (_addr->protocol);
    if (start_connecting_it != _start_connecting_map.end ()) {
        (this->*start_connecting_it->second) (io_thread);
        return;
633
    }
634 635 636 637 638 639 640 641 642 643 644

    zmq_assert (false);
}

#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
645
#endif
646

647
#if defined ZMQ_HAVE_TIPC
648 649 650 651 652 653
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
654
#endif
655 656 657 658 659 660 661 662 663

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow)
      ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
664
#endif
665

666 667 668 669 670 671 672
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
                                                       bool wait_)
{
    if (!options.socks_proxy_address.empty ()) {
        address_t *proxy_address = new (std::nothrow) address_t (
          protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
        alloc_assert (proxy_address);
673
        socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t (
674
          io_thread_, this, options, _addr, proxy_address, wait_);
675 676 677 678 679 680
        alloc_assert (connecter);
        if (!options.socks_proxy_username.empty ()) {
            connecter->set_auth_method_basic (options.socks_proxy_username,
                                              options.socks_proxy_password);
        }
        return connecter;
681 682 683 684
    }
    return new (std::nothrow)
      tcp_connecter_t (io_thread_, this, options, _addr, wait_);
}
685

686 687 688 689 690 691 692
zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_,
                                                      bool wait_)
{
    return new (std::nothrow)
      ws_connecter_t (io_thread_, this, options, _addr, wait_);
}

693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
#ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
    zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

    //  For EPGM transport with UDP encapsulation of PGM is used.
    bool const udp_encapsulation = _addr->protocol == "epgm";

    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with PGM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  PGM sender.
        pgm_sender_t *pgm_sender =
          new (std::nothrow) pgm_sender_t (io_thread_, options);
        alloc_assert (pgm_sender);

        int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
712 713
        errno_assert (rc == 0);

714 715 716 717 718 719
        send_attach (this, pgm_sender);
    } else {
        //  PGM receiver.
        pgm_receiver_t *pgm_receiver =
          new (std::nothrow) pgm_receiver_t (io_thread_, options);
        alloc_assert (pgm_receiver);
720

721 722 723 724 725
        int rc =
          pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
        errno_assert (rc == 0);

        send_attach (this, pgm_receiver);
726
    }
727 728
}
#endif
729

730 731 732 733 734 735 736 737 738 739 740 741 742 743
#ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with NORM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  NORM sender.
        norm_engine_t *norm_sender =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_sender);

        int rc = norm_sender->init (_addr->address.c_str (), true, false);
        errno_assert (rc == 0);
744

745 746
        send_attach (this, norm_sender);
    } else { // ZMQ_SUB or ZMQ_XSUB
747

748 749 750 751 752 753 754 755 756
        //  NORM receiver.
        norm_engine_t *norm_receiver =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_receiver);

        int rc = norm_receiver->init (_addr->address.c_str (), false, true);
        errno_assert (rc == 0);

        send_attach (this, norm_receiver);
757
    }
758
}
759
#endif
Martin Hurton's avatar
Martin Hurton committed
760

761 762 763 764 765 766 767 768
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
{
    zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                || options.type == ZMQ_DGRAM);

    udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
    alloc_assert (engine);

769 770
    const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM;
    const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM;
771

772 773 774 775
    const int rc = engine->init (_addr, send, recv);
    errno_assert (rc == 0);

    send_attach (this, engine);
776
}