session_base.cpp 20.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ipc_connecter.hpp"
39
#include "tipc_connecter.hpp"
40
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
41
#include "vmci_connecter.hpp"
42 43
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
44
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
45
#include "norm_engine.hpp"
46
#include "udp_engine.hpp"
47

48
#include "ctx.hpp"
49
#include "req.hpp"
50 51
#include "radio.hpp"
#include "dish.hpp"
52 53

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
54 55 56 57
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
58 59 60
{
    session_base_t *s = NULL;
    switch (options_.type) {
61 62 63
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
64
            break;
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
95 96 97 98 99 100
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
101 102 103 104
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
105 106
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
107 108 109 110 111 112 113 114 115 116
    _active (active_),
    _pipe (NULL),
    _zap_pipe (NULL),
    _incomplete_in (false),
    _pending (false),
    _engine (NULL),
    _socket (socket_),
    _io_thread (io_thread_),
    _has_linger_timer (false),
    _addr (addr_)
117
{
118 119
}

120 121
const char *zmq::session_base_t::get_endpoint () const
{
122
    return _engine->get_endpoint ();
123 124
}

125
zmq::session_base_t::~session_base_t ()
126
{
127 128
    zmq_assert (!_pipe);
    zmq_assert (!_zap_pipe);
129

130
    //  If there's still a pending linger timer, remove it.
131
    if (_has_linger_timer) {
132
        cancel_timer (linger_timer_id);
133
        _has_linger_timer = false;
134 135
    }

136
    //  Close the engine.
137 138
    if (_engine)
        _engine->terminate ();
139

140
    LIBZMQ_DELETE (_addr);
141
}
142

143
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
144
{
145
    zmq_assert (!is_terminating ());
146
    zmq_assert (!_pipe);
147
    zmq_assert (pipe_);
148 149
    _pipe = pipe_;
    _pipe->set_event_sink (this);
150 151
}

152
int zmq::session_base_t::pull_msg (msg_t *msg_)
153
{
154
    if (!_pipe || !_pipe->read (msg_)) {
155 156 157
        errno = EAGAIN;
        return -1;
    }
158

159
    _incomplete_in = (msg_->flags () & msg_t::more) != 0;
160

161
    return 0;
162 163
}

164
int zmq::session_base_t::push_msg (msg_t *msg_)
165
{
166 167 168
    //  pass subscribe/cancel to the sockets
    if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
        && !msg_->is_cancel ())
Jonathan Reams's avatar
Jonathan Reams committed
169
        return 0;
170
    if (_pipe && _pipe->write (msg_)) {
171 172
        int rc = msg_->init ();
        errno_assert (rc == 0);
173
        return 0;
174 175
    }

176 177
    errno = EAGAIN;
    return -1;
178 179
}

180 181
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
182
    if (_zap_pipe == NULL) {
183 184 185 186
        errno = ENOTCONN;
        return -1;
    }

187
    if (!_zap_pipe->read (msg_)) {
188 189 190 191 192 193 194 195 196
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
197
    if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
198 199 200 201 202
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
203
        _zap_pipe->flush ();
204 205 206 207 208 209

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
210 211 212 213
void zmq::session_base_t::reset ()
{
}

214
void zmq::session_base_t::flush ()
215
{
216 217
    if (_pipe)
        _pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
218 219
}

220
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
221
{
222
    zmq_assert (_pipe != NULL);
223

Martin Hurton's avatar
Martin Hurton committed
224 225
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
226 227
    _pipe->rollback ();
    _pipe->flush ();
228

Martin Hurton's avatar
Martin Hurton committed
229
    //  Remove any half-read message from the in pipe.
230
    while (_incomplete_in) {
Martin Hurton's avatar
Martin Hurton committed
231 232 233 234 235 236 237
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
238
    }
239 240
}

241
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
242
{
243
    // Drop the reference to the deallocated pipe if required.
244 245
    zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
                || _terminating_pipes.count (pipe_) == 1);
246

247
    if (pipe_ == _pipe) {
248
        // If this is our current pipe, remove it
249 250
        _pipe = NULL;
        if (_has_linger_timer) {
251
            cancel_timer (linger_timer_id);
252
            _has_linger_timer = false;
253
        }
254 255
    } else if (pipe_ == _zap_pipe)
        _zap_pipe = NULL;
256 257
    else
        // Remove the pipe from the detached pipes set
258
        _terminating_pipes.erase (pipe_);
259

260
    if (!is_terminating () && options.raw_socket) {
261 262 263
        if (_engine) {
            _engine->terminate ();
            _engine = NULL;
264
        }
Martin Hurton's avatar
Martin Hurton committed
265
        terminate ();
266 267
    }

Martin Hurton's avatar
Martin Hurton committed
268 269 270
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
271 272
    if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
        _pending = false;
Martin Hurton's avatar
Martin Hurton committed
273 274
        own_t::process_term (0);
    }
275 276
}

277
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
278
{
279
    // Skip activating if we're detaching this pipe
280 281
    if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
282
        return;
283
    }
284

285 286
    if (unlikely (_engine == NULL)) {
        _pipe->check_read ();
287 288 289
        return;
    }

290 291
    if (likely (pipe_ == _pipe))
        _engine->restart_output ();
292 293
    else {
        // i.e. pipe_ == zap_pipe
294
        _engine->zap_msg_available ();
295
    }
Martin Sustrik's avatar
Martin Sustrik committed
296 297
}

298
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
299
{
300
    // Skip activating if we're detaching this pipe
301 302
    if (_pipe != pipe_) {
        zmq_assert (_terminating_pipes.count (pipe_) == 1);
303
        return;
304
    }
305

306 307
    if (_engine)
        _engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
308 309
}

310
void zmq::session_base_t::hiccuped (pipe_t *)
311 312 313 314 315 316
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

317
zmq::socket_base_t *zmq::session_base_t::get_socket ()
318
{
319
    return _socket;
320 321
}

322
void zmq::session_base_t::process_plug ()
323
{
324
    if (_active)
325
        start_connecting (false);
326 327
}

328 329 330 331 332 333
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
334 335
int zmq::session_base_t::zap_connect ()
{
336
    if (_zap_pipe != NULL)
337
        return 0;
338 339 340 341 342 343

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
344 345
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
346 347 348

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
349 350 351 352
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
353
    int rc = pipepair (parents, new_pipes, hwms, conflates);
354 355 356
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
357 358 359
    _zap_pipe = new_pipes[0];
    _zap_pipe->set_nodelay ();
    _zap_pipe->set_event_sink (this);
360

361
    send_bind (peer.socket, new_pipes[1], false);
362

363 364
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
365 366 367
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
368
        id.set_flags (msg_t::routing_id);
369
        bool ok = _zap_pipe->write (&id);
370
        zmq_assert (ok);
371
        _zap_pipe->flush ();
372 373 374 375 376
    }

    return 0;
}

Min RK's avatar
Min RK committed
377 378
bool zmq::session_base_t::zap_enabled ()
{
379
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
380 381
}

382
void zmq::session_base_t::process_attach (i_engine *engine_)
383
{
Martin Hurton's avatar
Martin Hurton committed
384
    zmq_assert (engine_ != NULL);
385

386
    //  Create the pipe if it does not exist yet.
387 388
    if (!_pipe && !is_terminating ()) {
        object_t *parents[2] = {this, _socket};
389 390 391 392 393 394 395 396 397 398 399
        pipe_t *pipes[2] = {NULL, NULL};

        bool conflate =
          options.conflate
          && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL
              || options.type == ZMQ_PUSH || options.type == ZMQ_PUB
              || options.type == ZMQ_SUB);

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
400
        int rc = pipepair (parents, pipes, hwms, conflates);
401 402 403
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
404
        pipes[0]->set_event_sink (this);
405 406

        //  Remember the local end of the pipe.
407 408
        zmq_assert (!_pipe);
        _pipe = pipes[0];
409

410
        //  Ask socket to plug into the remote end of the pipe.
411
        send_bind (_socket, pipes[1]);
412 413
    }

414
    //  Plug in the engine.
415 416 417
    zmq_assert (!_engine);
    _engine = engine_;
    _engine->plug (_io_thread, this);
418 419
}

420
void zmq::session_base_t::engine_error (
421
  zmq::stream_engine_t::error_reason_t reason_)
422 423
{
    //  Engine is dead. Let's forget about it.
424
    _engine = NULL;
425

426
    //  Remove any half-done messages from the pipes.
427
    if (_pipe)
Martin Hurton's avatar
Martin Hurton committed
428
        clean_pipes ();
429

430 431 432
    zmq_assert (reason_ == stream_engine_t::connection_error
                || reason_ == stream_engine_t::timeout_error
                || reason_ == stream_engine_t::protocol_error);
433

434
    switch (reason_) {
435
        case stream_engine_t::timeout_error:
436
            /* FALLTHROUGH */
437
        case stream_engine_t::connection_error:
438
            if (_active) {
439
                reconnect ();
440 441
                break;
            }
442
            /* FALLTHROUGH */
443
        case stream_engine_t::protocol_error:
444 445 446 447 448
            if (_pending) {
                if (_pipe)
                    _pipe->terminate (false);
                if (_zap_pipe)
                    _zap_pipe->terminate (false);
449 450 451
            } else {
                terminate ();
            }
452 453
            break;
    }
454

455
    //  Just in case there's only a delimiter in the pipe.
456 457
    if (_pipe)
        _pipe->check_read ();
458

459 460
    if (_zap_pipe)
        _zap_pipe->check_read ();
461 462
}

463
void zmq::session_base_t::process_term (int linger_)
464
{
465
    zmq_assert (!_pending);
466 467 468

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
469
    //  standard termination immediately.
470
    if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
Martin Hurton's avatar
Martin Hurton committed
471
        own_t::process_term (0);
472 473 474
        return;
    }

475
    _pending = true;
476

477
    if (_pipe != NULL) {
478 479 480 481
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
482
            zmq_assert (!_has_linger_timer);
483
            add_timer (linger_, linger_timer_id);
484
            _has_linger_timer = true;
485 486 487 488
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
489
        _pipe->terminate (linger_ != 0);
490

491 492 493
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
494 495
        if (!_engine)
            _pipe->check_read ();
496
    }
497

498 499
    if (_zap_pipe != NULL)
        _zap_pipe->terminate (false);
500 501
}

502
void zmq::session_base_t::timer_event (int id_)
503 504 505 506
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
507
    _has_linger_timer = false;
508 509

    //  Ask pipe to terminate even though there may be pending messages in it.
510 511
    zmq_assert (_pipe);
    _pipe->terminate (false);
512 513
}

Martin Hurton's avatar
Martin Hurton committed
514
void zmq::session_base_t::reconnect ()
515
{
516 517
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
518 519
    if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
        && _addr->protocol != "epgm" && _addr->protocol != "norm"
520
        && _addr->protocol != protocol_name::udp) {
521 522 523 524 525 526
        _pipe->hiccup ();
        _pipe->terminate (false);
        _terminating_pipes.insert (_pipe);
        _pipe = NULL;

        if (_has_linger_timer) {
527
            cancel_timer (linger_timer_id);
528
            _has_linger_timer = false;
529
        }
530 531
    }

532
    reset ();
533

534
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
535 536
    if (options.reconnect_ivl != -1)
        start_connecting (true);
537 538
    else {
        std::string *ep = new (std::string);
539 540
        _addr->to_string (*ep);
        send_term_endpoint (_socket, ep);
541
    }
542

543 544
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
545
    if (_pipe
546 547
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
548
        _pipe->hiccup ();
549 550
}

551
void zmq::session_base_t::start_connecting (bool wait_)
552
{
553
    zmq_assert (_active);
554 555 556 557 558 559 560 561

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

562
    if (_addr->protocol == protocol_name::tcp) {
563
        if (!options.socks_proxy_address.empty ()) {
564
            address_t *proxy_address = new (std::nothrow)
565 566
              address_t (protocol_name::tcp, options.socks_proxy_address,
                         this->get_ctx ());
567
            alloc_assert (proxy_address);
568 569 570
            socks_connecter_t *connecter = new (std::nothrow)
              socks_connecter_t (io_thread, this, options, _addr, proxy_address,
                                 wait_);
571 572
            alloc_assert (connecter);
            launch_child (connecter);
573
        } else {
574
            tcp_connecter_t *connecter = new (std::nothrow)
575
              tcp_connecter_t (io_thread, this, options, _addr, wait_);
576 577 578
            alloc_assert (connecter);
            launch_child (connecter);
        }
579 580 581
        return;
    }

582 583
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
584
    if (_addr->protocol == protocol_name::ipc) {
585
        ipc_connecter_t *connecter = new (std::nothrow)
586
          ipc_connecter_t (io_thread, this, options, _addr, wait_);
587 588 589 590
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
591
#endif
592
#if defined ZMQ_HAVE_TIPC
593
    if (_addr->protocol == protocol_name::tipc) {
594
        tipc_connecter_t *connecter = new (std::nothrow)
595
          tipc_connecter_t (io_thread, this, options, _addr, wait_);
596
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
597
        launch_child (connecter);
598 599 600 601
        return;
    }
#endif

602
    if (_addr->protocol == protocol_name::udp) {
603 604
        zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                    || options.type == ZMQ_DGRAM);
605

606
        udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
607
        alloc_assert (engine);
608

609 610
        bool recv = false;
        bool send = false;
611

612 613 614
        if (options.type == ZMQ_RADIO) {
            send = true;
            recv = false;
615
        } else if (options.type == ZMQ_DISH) {
616 617
            send = false;
            recv = true;
618
        } else if (options.type == ZMQ_DGRAM) {
619 620 621
            send = true;
            recv = true;
        }
622

623
        int rc = engine->init (_addr, send, recv);
624
        errno_assert (rc == 0);
625

626
        send_attach (this, engine);
627

628 629
        return;
    }
630

631
#ifdef ZMQ_HAVE_OPENPGM
632

633
    //  Both PGM and EPGM transports are using the same infrastructure.
634
    if (_addr->protocol == "pgm" || _addr->protocol == "epgm") {
635
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
636
                    || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
637

638
        //  For EPGM transport with UDP encapsulation of PGM is used.
639
        bool const udp_encapsulation = _addr->protocol == "epgm";
640 641 642 643 644 645

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
            //  PGM sender.
646 647
            pgm_sender_t *pgm_sender =
              new (std::nothrow) pgm_sender_t (io_thread, options);
648 649
            alloc_assert (pgm_sender);

650
            int rc =
651
              pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
652
            errno_assert (rc == 0);
653 654

            send_attach (this, pgm_sender);
655
        } else {
656
            //  PGM receiver.
657 658
            pgm_receiver_t *pgm_receiver =
              new (std::nothrow) pgm_receiver_t (io_thread, options);
659 660
            alloc_assert (pgm_receiver);

661
            int rc =
662
              pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
663
            errno_assert (rc == 0);
664 665 666 667 668 669 670

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
671

bebopagogo's avatar
bebopagogo committed
672
#ifdef ZMQ_HAVE_NORM
673
    if (_addr->protocol == "norm") {
bebopagogo's avatar
bebopagogo committed
674 675 676 677 678
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
            //  NORM sender.
679 680
            norm_engine_t *norm_sender =
              new (std::nothrow) norm_engine_t (io_thread, options);
bebopagogo's avatar
bebopagogo committed
681 682
            alloc_assert (norm_sender);

683
            int rc = norm_sender->init (_addr->address.c_str (), true, false);
bebopagogo's avatar
bebopagogo committed
684 685 686
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
687
        } else { // ZMQ_SUB or ZMQ_XSUB
bebopagogo's avatar
bebopagogo committed
688 689

            //  NORM receiver.
690 691
            norm_engine_t *norm_receiver =
              new (std::nothrow) norm_engine_t (io_thread, options);
bebopagogo's avatar
bebopagogo committed
692 693
            alloc_assert (norm_receiver);

694
            int rc = norm_receiver->init (_addr->address.c_str (), false, true);
bebopagogo's avatar
bebopagogo committed
695 696 697 698 699 700 701
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
702

Ilya Kulakov's avatar
Ilya Kulakov committed
703
#if defined ZMQ_HAVE_VMCI
704
    if (_addr->protocol == protocol_name::vmci) {
705
        vmci_connecter_t *connecter = new (std::nothrow)
706
          vmci_connecter_t (io_thread, this, options, _addr, wait_);
Ilya Kulakov's avatar
Ilya Kulakov committed
707 708 709 710 711 712
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
#endif

713 714
    zmq_assert (false);
}