session_base.cpp 23.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ws_connecter.hpp"
39
#include "ipc_connecter.hpp"
40
#include "tipc_connecter.hpp"
41
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
42
#include "vmci_connecter.hpp"
43 44
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
45
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
46
#include "norm_engine.hpp"
47
#include "udp_engine.hpp"
48

49
#include "ctx.hpp"
50
#include "req.hpp"
51 52
#include "radio.hpp"
#include "dish.hpp"
53 54

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55 56 57 58
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
59 60 61
{
    session_base_t *s = NULL;
    switch (options_.type) {
62 63 64
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
65
            break;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
90
        case ZMQ_PEER:
91 92 93 94 95 96
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
97 98 99 100 101 102
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
103 104 105 106
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
107 108
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
109 110 111 112 113 114 115 116 117
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
118 119 120
    _addr (addr_)
#ifdef ZMQ_HAVE_WSS
    ,
121
    _wss_hostname (options_.wss_hostname)
122
#endif
123
{
124 125
}

126
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
127
{
128
    return _engine->get_endpoint ();
129 130
}

131
zmq::session_base_t::~session_base_t ()
132
{
133 134
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
135

136
    //  If there's still a pending linger timer, remove it.
137
    if (_has_linger_timer) {
138
        cancel_timer (linger_timer_id);
139
        _has_linger_timer = false;
140 141
    }

142
    //  Close the engine.
143 144
    if (_engine)
        _engine->terminate ();
145

146
    LIBZMQ_DELETE (_addr);
147
}
148

149
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
150
{
151
    zmq_assert (!is_terminating ());
152
    zmq_assert (!_pipe);
153
    zmq_assert (pipe_);
154 155
    _pipe = pipe_;
    _pipe->set_event_sink (this);
156 157
}

158
int zmq::session_base_t::pull_msg (msg_t *msg_)
159
{
160
    if (!_pipe || !_pipe->read (msg_)) {
161 162 163
        errno = EAGAIN;
        return -1;
    }
164

165
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
166

167
    return 0;
168 169
}

170
int zmq::session_base_t::push_msg (msg_t *msg_)
171
{
172 173 174
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
175
        return 0;
176
    if (_pipe && _pipe->write (msg_)) {
177
        const int rc = msg_->init ();
178
        errno_assert (rc == 0);
179
        return 0;
180 181
    }

182 183
    errno = EAGAIN;
    return -1;
184 185
}

186 187
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
188
    if (_zap_pipe == NULL) {
189 190 191 192
        errno = ENOTCONN;
        return -1;
    }

193
    if (!_zap_pipe->read (msg_)) {
194 195 196 197 198 199 200 201 202
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
203
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
204 205 206 207 208
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
209
        _zap_pipe->flush ();
210 211 212 213 214 215

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
216 217 218 219
void zmq::session_base_t::reset ()
{
}

220
void zmq::session_base_t::flush ()
221
{
222 223
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
224 225
}

226 227 228 229 230 231
void zmq::session_base_t::rollback ()
{
    if (_pipe)
        _pipe->rollback ();
}

232
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
233
{
234
    zmq_assert (_pipe != NULL);
235

Martin Hurton's avatar
Martin Hurton committed
236 237
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
238 239
    _pipe->rollback ();
    _pipe->flush ();
240

Martin Hurton's avatar
Martin Hurton committed
241
    //  Remove any half-read message from the in pipe.
242
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
243 244 245 246 247 248 249
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
250
    }
251 252
}

253
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
254
{
255
    // Drop the reference to the deallocated pipe if required.
256 257
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
258

259
    if (pipe_ == _pipe) {
260
        // If this is our current pipe, remove it
261 262
        _pipe = NULL;
        if (_has_linger_timer) {
263
            cancel_timer (linger_timer_id);
264
            _has_linger_timer = false;
265
        }
266 267
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
268 269
    else
        // Remove the pipe from the detached pipes set
270
        _terminating_pipes.erase (pipe_);
271

272
    if (!is_terminating () && options.raw_socket) {
273 274 275
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
276
        }
Martin Hurton's avatar
Martin Hurton committed
277
        terminate ();
278 279
    }

Martin Hurton's avatar
Martin Hurton committed
280 281 282
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
283 284
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
285 286
        own_t::process_term (0);
    }
287 288
}

289
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
290
{
291
    // Skip activating if we're detaching this pipe
292 293
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
294
        return;
295
    }
296

297 298
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
299 300 301
        return;
    }

302 303
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
304 305
    else {
        // i.e. pipe_ == zap_pipe
306
        _engine->zap_msg_available ();
307
    }
Martin Sustrik's avatar
Martin Sustrik committed
308 309
}

310
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
311
{
312
    // Skip activating if we're detaching this pipe
313 314
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
315
        return;
316
    }
317

318 319
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
320 321
}

322
void zmq::session_base_t::hiccuped (pipe_t *)
323 324 325 326 327 328
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

329
zmq::socket_base_t *zmq::session_base_t::get_socket () const
330
{
331
    return _socket;
332 333
}

334
void zmq::session_base_t::process_plug ()
335
{
336
    if (_active)
337
        start_connecting (false);
338 339
}

340 341 342 343 344 345
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
346 347
int zmq::session_base_t::zap_connect ()
{
348
    if (_zap_pipe != NULL)
349
        return 0;
350 351 352 353 354 355

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
356 357
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
358 359 360

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
361 362 363 364
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
365
    int rc = pipepair (parents, new_pipes, hwms, conflates);
366 367 368
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
369 370 371
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
372

373
    send_bind (peer.socket, new_pipes[1], false);
374

375 376
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
377 378 379
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
380
        id.set_flags (msg_t::routing_id);
381
        bool ok = _zap_pipe->write (&id);
382
        zmq_assert (ok);
383
        _zap_pipe->flush ();
384 385 386 387 388
    }

    return 0;
}

389
bool zmq::session_base_t::zap_enabled () const
Min RK's avatar
Min RK committed
390
{
391
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
392 393
}

394
void zmq::session_base_t::process_attach (i_engine *engine_)
395
{
Martin Hurton's avatar
Martin Hurton committed
396
    zmq_assert (engine_ != NULL);
397

398
    //  Create the pipe if it does not exist yet.
399 400
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
401 402
        pipe_t *pipes[2] = {NULL, NULL};

403
        const bool conflate = get_effective_conflate_option (options);
404 405 406 407

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
408
        const int rc = pipepair (parents, pipes, hwms, conflates);
409 410 411
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
412
        pipes[0]->set_event_sink (this);
413 414

        //  Remember the local end of the pipe.
415 416
        zmq_assert (!_pipe);
        _pipe = pipes[0];
417

418 419 420 421 422
        //  The endpoints strings are not set on bind, set them here so that
        //  events can use them.
        pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
        pipes[1]->set_endpoint_pair (engine_->get_endpoint ());

423
        //  Ask socket to plug into the remote end of the pipe.
424
        send_bind (_socket, pipes[1]);
425 426
    }

427
    //  Plug in the engine.
428 429 430
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
431 432
}

433
void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
434 435
{
    //  Engine is dead. Let's forget about it.
436
    _engine = NULL;
437

438
    //  Remove any half-done messages from the pipes.
439
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
440
        clean_pipes ();
441

442 443 444
    zmq_assert (reason_ == i_engine::connection_error
                || reason_ == i_engine::timeout_error
                || reason_ == i_engine::protocol_error);
445

446
    switch (reason_) {
447
        case i_engine::timeout_error:
448
            /* FALLTHROUGH */
449
        case i_engine::connection_error:
450
            if (_active) {
451
                reconnect ();
452 453
                break;
            }
454
            /* FALLTHROUGH */
455
        case i_engine::protocol_error:
456 457 458 459 460
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
461 462 463
            } else {
                terminate ();
            }
464 465
            break;
    }
466

467
    //  Just in case there's only a delimiter in the pipe.
468 469
    if (_pipe)
        _pipe->check_read ();
470

471 472
    if (_zap_pipe)
        _zap_pipe->check_read ();
473 474
}

475
void zmq::session_base_t::process_term (int linger_)
476
{
477
    zmq_assert (!_pending);
478 479 480

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
481
    //  standard termination immediately.
482
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
483
        own_t::process_term (0);
484 485 486
        return;
    }

487
    _pending = true;
488

489
    if (_pipe != NULL) {
490 491 492 493
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
494
            zmq_assert (!_has_linger_timer);
495
            add_timer (linger_, linger_timer_id);
496
            _has_linger_timer = true;
497 498 499 500
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
501
        _pipe->terminate (linger_ != 0);
502

503 504 505
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
506 507
        if (!_engine)
            _pipe->check_read ();
508
    }
509

510 511
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
512 513
}

514
void zmq::session_base_t::timer_event (int id_)
515 516 517 518
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
519
    _has_linger_timer = false;
520 521

    //  Ask pipe to terminate even though there may be pending messages in it.
522 523
    zmq_assert (_pipe);
    _pipe->terminate (false);
524 525
}

Martin Hurton's avatar
Martin Hurton committed
526
void zmq::session_base_t::reconnect ()
527
{
528 529
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
530 531
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
532
        && _addr->protocol != protocol_name::udp) {
533 534 535 536 537 538
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
539
            cancel_timer (linger_timer_id);
540
            _has_linger_timer = false;
541
        }
542 543
    }

544
    reset ();
545

546
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
547 548
    if (options.reconnect_ivl != -1)
        start_connecting (true);
549 550
    else {
        std::string *ep = new (std::string);
551 552
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
553
    }
554

555 556
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
557
    if (_pipe
558 559
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
560
        _pipe->hiccup ();
561 562
}

563 564
zmq::session_base_t::connecter_factory_entry_t
  zmq::session_base_t::_connecter_factories[] = {
565 566
    connecter_factory_entry_t (protocol_name::tcp,
                               &zmq::session_base_t::create_connecter_tcp),
567
#ifdef ZMQ_HAVE_WS
568 569
    connecter_factory_entry_t (protocol_name::ws,
                               &zmq::session_base_t::create_connecter_ws),
570
#endif
571 572 573 574
#ifdef ZMQ_HAVE_WSS
    connecter_factory_entry_t (protocol_name::wss,
                               &zmq::session_base_t::create_connecter_wss),
#endif
575
#if defined ZMQ_HAVE_IPC
576 577
    connecter_factory_entry_t (protocol_name::ipc,
                               &zmq::session_base_t::create_connecter_ipc),
578 579
#endif
#if defined ZMQ_HAVE_TIPC
580 581
    connecter_factory_entry_t (protocol_name::tipc,
                               &zmq::session_base_t::create_connecter_tipc),
582 583
#endif
#if defined ZMQ_HAVE_VMCI
584 585
    connecter_factory_entry_t (protocol_name::vmci,
                               &zmq::session_base_t::create_connecter_vmci),
586 587 588 589 590 591 592 593 594 595 596
#endif
};

zmq::session_base_t::connecter_factory_map_t
  zmq::session_base_t::_connecter_factories_map (
    _connecter_factories,
    _connecter_factories
      + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));

zmq::session_base_t::start_connecting_entry_t
  zmq::session_base_t::_start_connecting_entries[] = {
597 598
    start_connecting_entry_t (protocol_name::udp,
                              &zmq::session_base_t::start_connecting_udp),
599
#if defined ZMQ_HAVE_OPENPGM
600 601 602 603
    start_connecting_entry_t ("pgm",
                              &zmq::session_base_t::start_connecting_pgm),
    start_connecting_entry_t ("epgm",
                              &zmq::session_base_t::start_connecting_pgm),
604 605
#endif
#if defined ZMQ_HAVE_NORM
606 607
    start_connecting_entry_t ("norm",
                              &zmq::session_base_t::start_connecting_norm),
608 609 610 611 612 613 614 615 616 617
#endif
};

zmq::session_base_t::start_connecting_map_t
  zmq::session_base_t::_start_connecting_map (
    _start_connecting_entries,
    _start_connecting_entries
      + sizeof (_start_connecting_entries)
          / sizeof (_start_connecting_entries[0]));

618
void zmq::session_base_t::start_connecting (bool wait_)
619
{
620
    zmq_assert (_active);
621 622 623 624 625 626 627

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.
628 629 630 631 632 633 634 635 636
    const connecter_factory_map_t::const_iterator connecter_factories_it =
      _connecter_factories_map.find (_addr->protocol);
    if (connecter_factories_it != _connecter_factories_map.end ()) {
        own_t *connecter =
          (this->*connecter_factories_it->second) (io_thread, wait_);

        alloc_assert (connecter);
        launch_child (connecter);
        return;
637
    }
638 639 640 641 642
    const start_connecting_map_t::const_iterator start_connecting_it =
      _start_connecting_map.find (_addr->protocol);
    if (start_connecting_it != _start_connecting_map.end ()) {
        (this->*start_connecting_it->second) (io_thread);
        return;
643
    }
644 645 646 647 648 649 650 651 652 653 654

    zmq_assert (false);
}

#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
655
#endif
656

657
#if defined ZMQ_HAVE_TIPC
658 659 660 661 662 663
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
                                                        bool wait_)
{
    return new (std::nothrow)
      tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
664
#endif
665

666
#if defined ZMQ_HAVE_IPC
667 668 669 670 671 672
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow)
      ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
673
#endif
674

675 676 677 678 679 680 681
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
                                                       bool wait_)
{
    if (!options.socks_proxy_address.empty ()) {
        address_t *proxy_address = new (std::nothrow) address_t (
          protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
        alloc_assert (proxy_address);
682
        socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t (
683
          io_thread_, this, options, _addr, proxy_address, wait_);
684 685 686 687 688 689
        alloc_assert (connecter);
        if (!options.socks_proxy_username.empty ()) {
            connecter->set_auth_method_basic (options.socks_proxy_username,
                                              options.socks_proxy_password);
        }
        return connecter;
690 691 692 693
    }
    return new (std::nothrow)
      tcp_connecter_t (io_thread_, this, options, _addr, wait_);
}
694

695
#ifdef ZMQ_HAVE_WS
696 697 698
zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_,
                                                      bool wait_)
{
699 700
    return new (std::nothrow) ws_connecter_t (io_thread_, this, options, _addr,
                                              wait_, false, std::string ());
701 702 703 704 705 706 707 708 709
}
#endif

#ifdef ZMQ_HAVE_WSS
zmq::own_t *zmq::session_base_t::create_connecter_wss (io_thread_t *io_thread_,
                                                       bool wait_)
{
    return new (std::nothrow) ws_connecter_t (io_thread_, this, options, _addr,
                                              wait_, true, _wss_hostname);
710
}
711
#endif
712

713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
#ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
    zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
                || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

    //  For EPGM transport with UDP encapsulation of PGM is used.
    bool const udp_encapsulation = _addr->protocol == "epgm";

    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with PGM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  PGM sender.
        pgm_sender_t *pgm_sender =
          new (std::nothrow) pgm_sender_t (io_thread_, options);
        alloc_assert (pgm_sender);

        int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
732 733
        errno_assert (rc == 0);

734 735 736 737 738 739
        send_attach (this, pgm_sender);
    } else {
        //  PGM receiver.
        pgm_receiver_t *pgm_receiver =
          new (std::nothrow) pgm_receiver_t (io_thread_, options);
        alloc_assert (pgm_receiver);
740

741 742 743 744 745
        int rc =
          pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
        errno_assert (rc == 0);

        send_attach (this, pgm_receiver);
746
    }
747 748
}
#endif
749

750 751 752 753 754 755 756 757 758 759 760 761 762 763
#ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
    //  At this point we'll create message pipes to the session straight
    //  away. There's no point in delaying it as no concept of 'connect'
    //  exists with NORM anyway.
    if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
        //  NORM sender.
        norm_engine_t *norm_sender =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_sender);

        int rc = norm_sender->init (_addr->address.c_str (), true, false);
        errno_assert (rc == 0);
764

765 766
        send_attach (this, norm_sender);
    } else { // ZMQ_SUB or ZMQ_XSUB
767

768 769 770 771 772 773 774 775 776
        //  NORM receiver.
        norm_engine_t *norm_receiver =
          new (std::nothrow) norm_engine_t (io_thread_, options);
        alloc_assert (norm_receiver);

        int rc = norm_receiver->init (_addr->address.c_str (), false, true);
        errno_assert (rc == 0);

        send_attach (this, norm_receiver);
777
    }
778
}
779
#endif
Martin Hurton's avatar
Martin Hurton committed
780

781 782 783 784 785 786 787 788
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
{
    zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                || options.type == ZMQ_DGRAM);

    udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
    alloc_assert (engine);

789 790
    const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM;
    const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM;
791

792 793 794 795
    const int rc = engine->init (_addr, send, recv);
    errno_assert (rc == 0);

    send_attach (this, engine);
796
}