session_base.cpp 24.2 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ws_connecter.hpp"
39
#include "ipc_connecter.hpp"
40
#include "tipc_connecter.hpp"
41
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
42
#include "vmci_connecter.hpp"
43 44
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
45
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
46
#include "norm_engine.hpp"
47
#include "udp_engine.hpp"
48

49
#include "ctx.hpp"
50
#include "req.hpp"
51 52
#include "radio.hpp"
#include "dish.hpp"
53 54

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55 56 57 58
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
59 60 61
{
    session_base_t *s = NULL;
    switch (options_.type) {
62 63 64
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
65
            break;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
96 97 98 99 100 101
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
102 103 104 105
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
106 107
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
108 109 110 111 112 113 114 115 116
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
117 118
    _addr (addr_),
    _wss_hostname (NULL)
119
{
120 121 122 123 124
    if (options_.wss_hostname.length () > 0) {
        _wss_hostname = (char *) malloc (options_.wss_hostname.length () + 1);
        assert (_wss_hostname);
        strcpy (_wss_hostname, options_.wss_hostname.c_str ());
    }
125 126
}

127
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
128
{
129
    return _engine->get_endpoint ();
130 131
}

132
zmq::session_base_t::~session_base_t ()
133
{
134 135
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
136

137
    //  If there's still a pending linger timer, remove it.
138
    if (_has_linger_timer) {
139
        cancel_timer (linger_timer_id);
140
        _has_linger_timer = false;
141 142
    }

143
    //  Close the engine.
144 145
    if (_engine)
        _engine->terminate ();
146

147 148 149
    if (_wss_hostname)
        free (_wss_hostname);

150
    LIBZMQ_DELETE (_addr);
151
}
152

153
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
154
{
155
    zmq_assert (!is_terminating ());
156
    zmq_assert (!_pipe);
157
    zmq_assert (pipe_);
158 159
    _pipe = pipe_;
    _pipe->set_event_sink (this);
160 161
}

162
int zmq::session_base_t::pull_msg (msg_t *msg_)
163
{
164
    if (!_pipe || !_pipe->read (msg_)) {
165 166 167
        errno = EAGAIN;
        return -1;
    }
168

169
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
170

171
    return 0;
172 173
}

174
int zmq::session_base_t::push_msg (msg_t *msg_)
175
{
176 177 178
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
179
        return 0;
180
    if (_pipe && _pipe->write (msg_)) {
181 182
        int rc = msg_->init ();
        errno_assert (rc == 0);
183
        return 0;
184 185
    }

186 187
    errno = EAGAIN;
    return -1;
188 189
}

190 191
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
192
    if (_zap_pipe == NULL) {
193 194 195 196
        errno = ENOTCONN;
        return -1;
    }

197
    if (!_zap_pipe->read (msg_)) {
198 199 200 201 202 203 204 205 206
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
207
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
208 209 210 211 212
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
213
        _zap_pipe->flush ();
214 215 216 217 218 219

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
220 221 222 223
void zmq::session_base_t::reset ()
{
}

224
void zmq::session_base_t::flush ()
225
{
226 227
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
228 229
}

230 231 232 233 234 235
void zmq::session_base_t::rollback ()
{
    if (_pipe)
        _pipe->rollback ();
}

236
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
237
{
238
    zmq_assert (_pipe != NULL);
239

Martin Hurton's avatar
Martin Hurton committed
240 241
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
242 243
    _pipe->rollback ();
    _pipe->flush ();
244

Martin Hurton's avatar
Martin Hurton committed
245
    //  Remove any half-read message from the in pipe.
246
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
247 248 249 250 251 252 253
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
254
    }
255 256
}

257
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
258
{
259
    // Drop the reference to the deallocated pipe if required.
260 261
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
262

263
    if (pipe_ == _pipe) {
264
        // If this is our current pipe, remove it
265 266
        _pipe = NULL;
        if (_has_linger_timer) {
267
            cancel_timer (linger_timer_id);
268
            _has_linger_timer = false;
269
        }
270 271
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
272 273
    else
        // Remove the pipe from the detached pipes set
274
        _terminating_pipes.erase (pipe_);
275

276
    if (!is_terminating () && options.raw_socket) {
277 278 279
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
280
        }
Martin Hurton's avatar
Martin Hurton committed
281
        terminate ();
282 283
    }

Martin Hurton's avatar
Martin Hurton committed
284 285 286
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
287 288
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
289 290
        own_t::process_term (0);
    }
291 292
}

293
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
294
{
295
    // Skip activating if we're detaching this pipe
296 297
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
298
        return;
299
    }
300

301 302
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
303 304 305
        return;
    }

306 307
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
308 309
    else {
        // i.e. pipe_ == zap_pipe
310
        _engine->zap_msg_available ();
311
    }
Martin Sustrik's avatar
Martin Sustrik committed
312 313
}

314
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
315
{
316
    // Skip activating if we're detaching this pipe
317 318
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
319
        return;
320
    }
321

322 323
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
324 325
}

326
void zmq::session_base_t::hiccuped (pipe_t *)
327 328 329 330 331 332
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

333
zmq::socket_base_t *zmq::session_base_t::get_socket ()
334
{
335
    return _socket;
336 337
}

338
void zmq::session_base_t::process_plug ()
339
{
340
    if (_active)
341
        start_connecting (false);
342 343
}

344 345 346 347 348 349
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
350 351
int zmq::session_base_t::zap_connect ()
{
352
    if (_zap_pipe != NULL)
353
        return 0;
354 355 356 357 358 359

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
360 361
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
362 363 364

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
365 366 367 368
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
369
    int rc = pipepair (parents, new_pipes, hwms, conflates);
370 371 372
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
373 374 375
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
376

377
    send_bind (peer.socket, new_pipes[1], false);
378

379 380
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
381 382 383
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
384
        id.set_flags (msg_t::routing_id);
385
        bool ok = _zap_pipe->write (&id);
386
        zmq_assert (ok);
387
        _zap_pipe->flush ();
388 389 390 391 392
    }

    return 0;
}

Min RK's avatar
Min RK committed
393 394
bool zmq::session_base_t::zap_enabled ()
{
395
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
396 397
}

398
void zmq::session_base_t::process_attach (i_engine *engine_)
399
{
Martin Hurton's avatar
Martin Hurton committed
400
    zmq_assert (engine_ != NULL);
401

402
    //  Create the pipe if it does not exist yet.
403 404
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
405 406
        pipe_t *pipes[2] = {NULL, NULL};

407
        const bool conflate = get_effective_conflate_option (options);
408 409 410 411

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
412
        int rc = pipepair (parents, pipes, hwms, conflates);
413 414 415
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
416
        pipes[0]->set_event_sink (this);
417 418

        //  Remember the local end of the pipe.
419 420
        zmq_assert (!_pipe);
        _pipe = pipes[0];
421

422 423 424 425 426
        //  The endpoints strings are not set on bind, set them here so that
        //  events can use them.
        pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
        pipes[1]->set_endpoint_pair (engine_->get_endpoint ());

427
        //  Ask socket to plug into the remote end of the pipe.
428
        send_bind (_socket, pipes[1]);
429 430
    }

431
    //  Plug in the engine.
432 433 434
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
435 436
}

437
void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
438 439
{
    //  Engine is dead. Let's forget about it.
440
    _engine = NULL;
441

442
    //  Remove any half-done messages from the pipes.
443
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
444
        clean_pipes ();
445

446 447 448
    zmq_assert (reason_ == i_engine::connection_error
                || reason_ == i_engine::timeout_error
                || reason_ == i_engine::protocol_error);
449

450
    switch (reason_) {
451
        case i_engine::timeout_error:
452
            /* FALLTHROUGH */
453
        case i_engine::connection_error:
454
            if (_active) {
455
                reconnect ();
456 457
                break;
            }
458
            /* FALLTHROUGH */
459
        case i_engine::protocol_error:
460 461 462 463 464
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
465 466 467
            } else {
                terminate ();
            }
468 469
            break;
    }
470

471
    //  Just in case there's only a delimiter in the pipe.
472 473
    if (_pipe)
        _pipe->check_read ();
474

475 476
    if (_zap_pipe)
        _zap_pipe->check_read ();
477 478
}

479
void zmq::session_base_t::process_term (int linger_)
480
{
481
    zmq_assert (!_pending);
482 483 484

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
485
    //  standard termination immediately.
486
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
487
        own_t::process_term (0);
488 489 490
        return;
    }

491
    _pending = true;
492

493
    if (_pipe != NULL) {
494 495 496 497
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
498
            zmq_assert (!_has_linger_timer);
499
            add_timer (linger_, linger_timer_id);
500
            _has_linger_timer = true;
501 502 503 504
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
505
        _pipe->terminate (linger_ != 0);
506

507 508 509
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
510 511
        if (!_engine)
            _pipe->check_read ();
512
    }
513

514 515
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
516 517
}

518
void zmq::session_base_t::timer_event (int id_)
519 520 521 522
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
523
    _has_linger_timer = false;
524 525

    //  Ask pipe to terminate even though there may be pending messages in it.
526 527
    zmq_assert (_pipe);
    _pipe->terminate (false);
528 529
}

Martin Hurton's avatar
Martin Hurton committed
530
void zmq::session_base_t::reconnect ()
531
{
532 533
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
534 535
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
536
        && _addr->protocol != protocol_name::udp) {
537 538 539 540 541 542
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
543
            cancel_timer (linger_timer_id);
544
            _has_linger_timer = false;
545
        }
546 547
    }

548
    reset ();
549

550
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
551 552
    if (options.reconnect_ivl != -1)
        start_connecting (true);
553 554
    else {
        std::string *ep = new (std::string);
555 556
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
557
    }
558

559 560
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
561
    if (_pipe
562 563
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
564
        _pipe->hiccup ();
565 566
}

567 568
zmq::session_base_t::connecter_factory_entry_t
  zmq::session_base_t::_connecter_factories[] = {
569 570
    connecter_factory_entry_t (protocol_name::tcp,
                               &zmq::session_base_t::create_connecter_tcp),
571
#ifdef ZMQ_HAVE_WS
572 573
    connecter_factory_entry_t (protocol_name::ws,
                               &zmq::session_base_t::create_connecter_ws),
574
#endif
575 576 577 578
#ifdef ZMQ_HAVE_WSS
    connecter_factory_entry_t (protocol_name::wss,
                               &zmq::session_base_t::create_connecter_wss),
#endif
579 580
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
581 582
    connecter_factory_entry_t (protocol_name::ipc,
                               &zmq::session_base_t::create_connecter_ipc),
583 584
#endif
#if defined ZMQ_HAVE_TIPC
585 586
    connecter_factory_entry_t (protocol_name::tipc,
                               &zmq::session_base_t::create_connecter_tipc),
587 588
#endif
#if defined ZMQ_HAVE_VMCI
589 590
    connecter_factory_entry_t (protocol_name::vmci,
                               &zmq::session_base_t::create_connecter_vmci),
591 592 593 594 595 596 597 598 599 600 601
#endif
};

zmq::session_base_t::connecter_factory_map_t
  zmq::session_base_t::_connecter_factories_map (
    _connecter_factories,
    _connecter_factories
      + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));

zmq::session_base_t::start_connecting_entry_t
  zmq::session_base_t::_start_connecting_entries[] = {
602 603
    start_connecting_entry_t (protocol_name::udp,
                              &zmq::session_base_t::start_connecting_udp),
604
#if defined ZMQ_HAVE_OPENPGM
605 606 607 608
    start_connecting_entry_t ("pgm",
                              &zmq::session_base_t::start_connecting_pgm),
    start_connecting_entry_t ("epgm",
                              &zmq::session_base_t::start_connecting_pgm),
609 610
#endif
#if defined ZMQ_HAVE_NORM
611 612
    start_connecting_entry_t ("norm",
                              &zmq::session_base_t::start_connecting_norm),
613 614 615 616 617 618 619 620 621 622
#endif
};

zmq::session_base_t::start_connecting_map_t
  zmq::session_base_t::_start_connecting_map (
    _start_connecting_entries,
    _start_connecting_entries
      + sizeof (_start_connecting_entries)
          / sizeof (_start_connecting_entries[0]));

623
void zmq::session_base_t::start_connecting (bool wait_)
624
{
625
    zmq_assert (_active);
626 627 628 629 630 631 632

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.
633 634 635 636 637 638 639 640 641
    const connecter_factory_map_t::const_iterator connecter_factories_it =
      _connecter_factories_map.find (_addr->protocol);
    if (connecter_factories_it != _connecter_factories_map.end ()) {
        own_t *connecter =
          (this->*connecter_factories_it->second) (io_thread, wait_);

        alloc_assert (connecter);
        launch_child (connecter);
        return;
642
    }
643 644 645 646 647
    const start_connecting_map_t::const_iterator start_connecting_it =
      _start_connecting_map.find (_addr->protocol);
    if (start_connecting_it != _start_connecting_map.end ()) {
        (this->*start_connecting_it->second) (io_thread);
        return;
648
    }
649 650 651 652 653 654 655 656 657 658 659

    zmq_assert (false);
}

#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
660
#endif
661

662
#if defined ZMQ_HAVE_TIPC
663 664 665 666 667 668
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
669
#endif
670 671 672 673 674 675 676 677 678

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow)
      ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
679
#endif
680

681 682 683 684 685 686 687
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
                                                       bool wait_)
{
    if (!options.socks_proxy_address.empty ()) {
        address_t *proxy_address = new (std::nothrow) address_t (
          protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
        alloc_assert (proxy_address);
688
        socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t (
689
          io_thread_, this, options, _addr, proxy_address, wait_);
690 691 692 693 694 695
        alloc_assert (connecter);
        if (!options.socks_proxy_username.empty ()) {
            connecter->set_auth_method_basic (options.socks_proxy_username,
                                              options.socks_proxy_password);
        }
        return connecter;
696 697 698 699
    }
    return new (std::nothrow)
      tcp_connecter_t (io_thread_, this, options, _addr, wait_);
}
700

701
#ifdef ZMQ_HAVE_WS
702 703 704 705
zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_,
                                                      bool wait_)
{
    return new (std::nothrow)
706 707 708 709 710 711 712 713 714 715
      ws_connecter_t (io_thread_, this, options, _addr, wait_, false, NULL);
}
#endif

#ifdef ZMQ_HAVE_WSS
zmq::own_t *zmq::session_base_t::create_connecter_wss (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow) ws_connecter_t (io_thread_, this, options, _addr,
                                              wait_, true, _wss_hostname);
716
}
717
#endif
718

719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
#ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
    zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

    //  For EPGM transport with UDP encapsulation of PGM is used.
    bool const udp_encapsulation = _addr->protocol == "epgm";

    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with PGM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  PGM sender.
        pgm_sender_t *pgm_sender =
          new (std::nothrow) pgm_sender_t (io_thread_, options);
        alloc_assert (pgm_sender);

        int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
738 739
        errno_assert (rc == 0);

740 741 742 743 744 745
        send_attach (this, pgm_sender);
    } else {
        //  PGM receiver.
        pgm_receiver_t *pgm_receiver =
          new (std::nothrow) pgm_receiver_t (io_thread_, options);
        alloc_assert (pgm_receiver);
746

747 748 749 750 751
        int rc =
          pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
        errno_assert (rc == 0);

        send_attach (this, pgm_receiver);
752
    }
753 754
}
#endif
755

756 757 758 759 760 761 762 763 764 765 766 767 768 769
#ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with NORM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  NORM sender.
        norm_engine_t *norm_sender =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_sender);

        int rc = norm_sender->init (_addr->address.c_str (), true, false);
        errno_assert (rc == 0);
770

771 772
        send_attach (this, norm_sender);
    } else { // ZMQ_SUB or ZMQ_XSUB
773

774 775 776 777 778 779 780 781 782
        //  NORM receiver.
        norm_engine_t *norm_receiver =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_receiver);

        int rc = norm_receiver->init (_addr->address.c_str (), false, true);
        errno_assert (rc == 0);

        send_attach (this, norm_receiver);
783
    }
784
}
785
#endif
Martin Hurton's avatar
Martin Hurton committed
786

787 788 789 790 791 792 793 794
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
{
    zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                || options.type == ZMQ_DGRAM);

    udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
    alloc_assert (engine);

795 796
    const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM;
    const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM;
797

798 799 800 801
    const int rc = engine->init (_addr, send, recv);
    errno_assert (rc == 0);

    send_attach (this, engine);
802
}