session_base.cpp 20.5 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "session_base.hpp"
33
#include "i_engine.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "likely.hpp"
37
#include "tcp_connecter.hpp"
38
#include "ipc_connecter.hpp"
39
#include "tipc_connecter.hpp"
40
#include "socks_connecter.hpp"
Ilya Kulakov's avatar
Ilya Kulakov committed
41
#include "vmci_connecter.hpp"
42 43
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
44
#include "address.hpp"
bebopagogo's avatar
bebopagogo committed
45
#include "norm_engine.hpp"
46
#include "udp_engine.hpp"
47

48
#include "ctx.hpp"
49
#include "req.hpp"
50 51
#include "radio.hpp"
#include "dish.hpp"
52 53

zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
54 55 56 57
                                                  bool active_,
                                                  class socket_base_t *socket_,
                                                  const options_t &options_,
                                                  address_t *addr_)
58 59 60
{
    session_base_t *s = NULL;
    switch (options_.type) {
61 62 63
        case ZMQ_REQ:
            s = new (std::nothrow)
              req_session_t (io_thread_, active_, socket_, options_, addr_);
64
            break;
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        case ZMQ_RADIO:
            s = new (std::nothrow)
              radio_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DISH:
            s = new (std::nothrow)
              dish_session_t (io_thread_, active_, socket_, options_, addr_);
            break;
        case ZMQ_DEALER:
        case ZMQ_REP:
        case ZMQ_ROUTER:
        case ZMQ_PUB:
        case ZMQ_XPUB:
        case ZMQ_SUB:
        case ZMQ_XSUB:
        case ZMQ_PUSH:
        case ZMQ_PULL:
        case ZMQ_PAIR:
        case ZMQ_STREAM:
        case ZMQ_SERVER:
        case ZMQ_CLIENT:
        case ZMQ_GATHER:
        case ZMQ_SCATTER:
        case ZMQ_DGRAM:
            s = new (std::nothrow)
              session_base_t (io_thread_, active_, socket_, options_, addr_);
            break;
        default:
            errno = EINVAL;
            return NULL;
95 96 97 98 99 100
    }
    alloc_assert (s);
    return s;
}

zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
101 102 103 104
                                     bool active_,
                                     class socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
105 106
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
Martin Hurton's avatar
Martin Hurton committed
107
    active (active_),
108
    pipe (NULL),
109
    zap_pipe (NULL),
110
    incomplete_in (false),
111
    pending (false),
112
    engine (NULL),
113 114
    socket (socket_),
    io_thread (io_thread_),
115
    has_linger_timer (false),
116
    addr (addr_)
117
{
118 119
}

120 121 122 123 124
const char *zmq::session_base_t::get_endpoint () const
{
    return engine->get_endpoint ();
}

125
zmq::session_base_t::~session_base_t ()
126
{
127
    zmq_assert (!pipe);
128
    zmq_assert (!zap_pipe);
129

130 131 132 133 134 135
    //  If there's still a pending linger timer, remove it.
    if (has_linger_timer) {
        cancel_timer (linger_timer_id);
        has_linger_timer = false;
    }

136 137 138
    //  Close the engine.
    if (engine)
        engine->terminate ();
139

140
    LIBZMQ_DELETE (addr);
141
}
142

143
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
144
{
145
    zmq_assert (!is_terminating ());
146 147 148 149
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
150 151
}

152
int zmq::session_base_t::pull_msg (msg_t *msg_)
153
{
154 155 156 157
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }
158

159
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
160

161
    return 0;
162 163
}

164
int zmq::session_base_t::push_msg (msg_t *msg_)
165
{
166
    if (msg_->flags () & msg_t::command)
Jonathan Reams's avatar
Jonathan Reams committed
167
        return 0;
168
    if (pipe && pipe->write (msg_)) {
169 170
        int rc = msg_->init ();
        errno_assert (rc == 0);
171
        return 0;
172 173
    }

174 175
    errno = EAGAIN;
    return -1;
176 177
}

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
    if (zap_pipe == NULL) {
        errno = ENOTCONN;
        return -1;
    }

    if (!zap_pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
195
    if (zap_pipe == NULL || !zap_pipe->write (msg_)) {
196 197 198 199 200 201 202 203 204 205 206 207
        errno = ENOTCONN;
        return -1;
    }

    if ((msg_->flags () & msg_t::more) == 0)
        zap_pipe->flush ();

    const int rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
208 209 210 211
void zmq::session_base_t::reset ()
{
}

212
void zmq::session_base_t::flush ()
213
{
214 215
    if (pipe)
        pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
216 217
}

218
void zmq::session_base_t::clean_pipes ()
Martin Sustrik's avatar
Martin Sustrik committed
219
{
Martin Hurton's avatar
Martin Hurton committed
220
    zmq_assert (pipe != NULL);
221

Martin Hurton's avatar
Martin Hurton committed
222 223 224 225
    //  Get rid of half-processed messages in the out pipe. Flush any
    //  unflushed messages upstream.
    pipe->rollback ();
    pipe->flush ();
226

Martin Hurton's avatar
Martin Hurton committed
227 228 229 230 231 232 233 234 235
    //  Remove any half-read message from the in pipe.
    while (incomplete_in) {
        msg_t msg;
        int rc = msg.init ();
        errno_assert (rc == 0);
        rc = pull_msg (&msg);
        errno_assert (rc == 0);
        rc = msg.close ();
        errno_assert (rc == 0);
236
    }
237 238
}

239
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
240
{
241
    // Drop the reference to the deallocated pipe if required.
242 243
    zmq_assert (pipe_ == pipe || pipe_ == zap_pipe
                || terminating_pipes.count (pipe_) == 1);
244

245
    if (pipe_ == pipe) {
246 247
        // If this is our current pipe, remove it
        pipe = NULL;
248 249 250 251
        if (has_linger_timer) {
            cancel_timer (linger_timer_id);
            has_linger_timer = false;
        }
252
    } else if (pipe_ == zap_pipe)
253
        zap_pipe = NULL;
254 255
    else
        // Remove the pipe from the detached pipes set
256
        terminating_pipes.erase (pipe_);
257

258
    if (!is_terminating () && options.raw_socket) {
Martin Hurton's avatar
Martin Hurton committed
259
        if (engine) {
260 261 262
            engine->terminate ();
            engine = NULL;
        }
Martin Hurton's avatar
Martin Hurton committed
263
        terminate ();
264 265
    }

Martin Hurton's avatar
Martin Hurton committed
266 267 268
    //  If we are waiting for pending messages to be sent, at this point
    //  we are sure that there will be no more messages and we can proceed
    //  with termination safely.
Martin Hurton's avatar
Martin Hurton committed
269 270 271 272
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
        pending = false;
        own_t::process_term (0);
    }
273 274
}

275
void zmq::session_base_t::read_activated (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
276
{
277
    // Skip activating if we're detaching this pipe
Martin Hurton's avatar
Martin Hurton committed
278
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
279
        zmq_assert (terminating_pipes.count (pipe_) == 1);
280
        return;
281
    }
282

283 284 285 286 287 288
    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
289
        engine->restart_output ();
290 291
    else {
        // i.e. pipe_ == zap_pipe
292
        engine->zap_msg_available ();
293
    }
Martin Sustrik's avatar
Martin Sustrik committed
294 295
}

296
void zmq::session_base_t::write_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
297
{
298
    // Skip activating if we're detaching this pipe
299 300
    if (pipe != pipe_) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
301
        return;
302
    }
303

Martin Hurton's avatar
Martin Hurton committed
304
    if (engine)
305
        engine->restart_input ();
Martin Hurton's avatar
Martin Hurton committed
306 307
}

308
void zmq::session_base_t::hiccuped (pipe_t *)
309 310 311 312 313 314
{
    //  Hiccups are always sent from session to socket, not the other
    //  way round.
    zmq_assert (false);
}

315
zmq::socket_base_t *zmq::session_base_t::get_socket ()
316
{
317
    return socket;
318 319
}

320
void zmq::session_base_t::process_plug ()
321
{
Martin Hurton's avatar
Martin Hurton committed
322
    if (active)
323
        start_connecting (false);
324 325
}

326 327 328 329 330 331
//  This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
//  is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
//  or it aborts on any other error. In other words, either ZAP is not
//  configured or if it is configured it MUST be configured correctly and it
//  MUST work, otherwise authentication cannot be guaranteed and it would be a
//  security flaw.
332 333
int zmq::session_base_t::zap_connect ()
{
334 335
    if (zap_pipe != NULL)
        return 0;
336 337 338 339 340 341

    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
    if (peer.socket == NULL) {
        errno = ECONNREFUSED;
        return -1;
    }
342 343
    zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
                || peer.options.type == ZMQ_SERVER);
344 345 346

    //  Create a bi-directional pipe that will connect
    //  session with zap socket.
347 348 349 350
    object_t *parents[2] = {this, peer.socket};
    pipe_t *new_pipes[2] = {NULL, NULL};
    int hwms[2] = {0, 0};
    bool conflates[2] = {false, false};
Ian Barber's avatar
Ian Barber committed
351
    int rc = pipepair (parents, new_pipes, hwms, conflates);
352 353 354
    errno_assert (rc == 0);

    //  Attach local end of the pipe to this socket object.
355
    zap_pipe = new_pipes[0];
Ian Barber's avatar
Ian Barber committed
356
    zap_pipe->set_nodelay ();
357 358
    zap_pipe->set_event_sink (this);

359
    send_bind (peer.socket, new_pipes[1], false);
360

361 362
    //  Send empty routing id if required by the peer.
    if (peer.options.recv_routing_id) {
363 364 365
        msg_t id;
        rc = id.init ();
        errno_assert (rc == 0);
366
        id.set_flags (msg_t::routing_id);
367 368 369
        bool ok = zap_pipe->write (&id);
        zmq_assert (ok);
        zap_pipe->flush ();
370 371 372 373 374
    }

    return 0;
}

Min RK's avatar
Min RK committed
375 376
bool zmq::session_base_t::zap_enabled ()
{
377
    return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
Min RK's avatar
Min RK committed
378 379
}

380
void zmq::session_base_t::process_attach (i_engine *engine_)
381
{
Martin Hurton's avatar
Martin Hurton committed
382
    zmq_assert (engine_ != NULL);
383

384
    //  Create the pipe if it does not exist yet.
385
    if (!pipe && !is_terminating ()) {
386 387 388 389 390 391 392 393 394 395 396 397
        object_t *parents[2] = {this, socket};
        pipe_t *pipes[2] = {NULL, NULL};

        bool conflate =
          options.conflate
          && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL
              || options.type == ZMQ_PUSH || options.type == ZMQ_PUB
              || options.type == ZMQ_SUB);

        int hwms[2] = {conflate ? -1 : options.rcvhwm,
                       conflate ? -1 : options.sndhwm};
        bool conflates[2] = {conflate, conflate};
Ian Barber's avatar
Ian Barber committed
398
        int rc = pipepair (parents, pipes, hwms, conflates);
399 400 401
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
402
        pipes[0]->set_event_sink (this);
403 404

        //  Remember the local end of the pipe.
405
        zmq_assert (!pipe);
406
        pipe = pipes[0];
407

408
        //  Ask socket to plug into the remote end of the pipe.
409
        send_bind (socket, pipes[1]);
410 411
    }

412
    //  Plug in the engine.
413
    zmq_assert (!engine);
414
    engine = engine_;
415 416 417
    engine->plug (io_thread, this);
}

418
void zmq::session_base_t::engine_error (
419
  zmq::stream_engine_t::error_reason_t reason)
420 421 422 423
{
    //  Engine is dead. Let's forget about it.
    engine = NULL;

424
    //  Remove any half-done messages from the pipes.
Martin Hurton's avatar
Martin Hurton committed
425 426
    if (pipe)
        clean_pipes ();
427

428
    zmq_assert (reason == stream_engine_t::connection_error
429 430
                || reason == stream_engine_t::timeout_error
                || reason == stream_engine_t::protocol_error);
431 432 433

    switch (reason) {
        case stream_engine_t::timeout_error:
434
            /* FALLTHROUGH */
435
        case stream_engine_t::connection_error:
436
            if (active) {
437
                reconnect ();
438 439
                break;
            }
440
            /* FALLTHROUGH */
441
        case stream_engine_t::protocol_error:
442 443 444 445 446 447 448 449
            if (pending) {
                if (pipe)
                    pipe->terminate (0);
                if (zap_pipe)
                    zap_pipe->terminate (0);
            } else {
                terminate ();
            }
450 451
            break;
    }
452

453
    //  Just in case there's only a delimiter in the pipe.
454 455
    if (pipe)
        pipe->check_read ();
456 457 458

    if (zap_pipe)
        zap_pipe->check_read ();
459 460
}

461
void zmq::session_base_t::process_term (int linger_)
462
{
463 464 465 466
    zmq_assert (!pending);

    //  If the termination of the pipe happens before the term command is
    //  delivered there's nothing much to do. We can proceed with the
Ian Barber's avatar
Ian Barber committed
467
    //  standard termination immediately.
Martin Hurton's avatar
Martin Hurton committed
468 469
    if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
        own_t::process_term (0);
470 471 472 473 474
        return;
    }

    pending = true;

475 476 477 478 479 480 481 482 483 484 485 486 487
    if (pipe != NULL) {
        //  If there's finite linger value, delay the termination.
        //  If linger is infinite (negative) we don't even have to set
        //  the timer.
        if (linger_ > 0) {
            zmq_assert (!has_linger_timer);
            add_timer (linger_, linger_timer_id);
            has_linger_timer = true;
        }

        //  Start pipe termination process. Delay the termination till all messages
        //  are processed in case the linger time is non-zero.
        pipe->terminate (linger_ != 0);
488

489 490 491
        //  TODO: Should this go into pipe_t::terminate ?
        //  In case there's no engine and there's only delimiter in the
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
492 493
        if (!engine)
            pipe->check_read ();
494
    }
495

496 497
    if (zap_pipe != NULL)
        zap_pipe->terminate (false);
498 499
}

500
void zmq::session_base_t::timer_event (int id_)
501 502 503 504 505
{
    //  Linger period expired. We can proceed with termination even though
    //  there are still pending messages to be sent.
    zmq_assert (id_ == linger_timer_id);
    has_linger_timer = false;
506 507 508

    //  Ask pipe to terminate even though there may be pending messages in it.
    zmq_assert (pipe);
509
    pipe->terminate (false);
510 511
}

Martin Hurton's avatar
Martin Hurton committed
512
void zmq::session_base_t::reconnect ()
513
{
514 515
    //  For delayed connect situations, terminate the pipe
    //  and reestablish later on
516 517 518
    if (pipe && options.immediate == 1 && addr->protocol != "pgm"
        && addr->protocol != "epgm" && addr->protocol != "norm"
        && addr->protocol != "udp") {
519 520
        pipe->hiccup ();
        pipe->terminate (false);
521
        terminating_pipes.insert (pipe);
522
        pipe = NULL;
523 524 525 526 527

        if (has_linger_timer) {
            cancel_timer (linger_timer_id);
            has_linger_timer = false;
        }
528 529
    }

530
    reset ();
531

532
    //  Reconnect.
Sergey KHripchenko's avatar
Sergey KHripchenko committed
533 534
    if (options.reconnect_ivl != -1)
        start_connecting (true);
535 536 537 538 539
    else {
        std::string *ep = new (std::string);
        addr->to_string (*ep);
        send_term_endpoint (socket, ep);
    }
540

541 542
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
    //  the socket object to resend all the subscriptions.
543 544 545
    if (pipe
        && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
            || options.type == ZMQ_DISH))
Sergey KHripchenko's avatar
Sergey KHripchenko committed
546
        pipe->hiccup ();
547 548
}

549
void zmq::session_base_t::start_connecting (bool wait_)
550
{
Martin Hurton's avatar
Martin Hurton committed
551
    zmq_assert (active);
552 553 554 555 556 557 558 559

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

560
    if (addr->protocol == "tcp") {
561
        if (!options.socks_proxy_address.empty ()) {
562
            address_t *proxy_address = new (std::nothrow)
563
              address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
564 565
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
566 567
              new (std::nothrow) socks_connecter_t (io_thread, this, options,
                                                    addr, proxy_address, wait_);
568 569
            alloc_assert (connecter);
            launch_child (connecter);
570
        } else {
571
            tcp_connecter_t *connecter = new (std::nothrow)
572
              tcp_connecter_t (io_thread, this, options, addr, wait_);
573 574 575
            alloc_assert (connecter);
            launch_child (connecter);
        }
576 577 578
        return;
    }

579 580
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS                     \
  && !defined ZMQ_HAVE_VXWORKS
581
    if (addr->protocol == "ipc") {
582 583
        ipc_connecter_t *connecter = new (std::nothrow)
          ipc_connecter_t (io_thread, this, options, addr, wait_);
584 585 586 587
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
588
#endif
589
#if defined ZMQ_HAVE_TIPC
590
    if (addr->protocol == "tipc") {
591 592
        tipc_connecter_t *connecter = new (std::nothrow)
          tipc_connecter_t (io_thread, this, options, addr, wait_);
593
        alloc_assert (connecter);
Martin Hurton's avatar
Martin Hurton committed
594
        launch_child (connecter);
595 596 597 598
        return;
    }
#endif

599
    if (addr->protocol == "udp") {
600 601
        zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
                    || options.type == ZMQ_DGRAM);
602

603
        udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
604
        alloc_assert (engine);
605

606 607
        bool recv = false;
        bool send = false;
608

609 610 611
        if (options.type == ZMQ_RADIO) {
            send = true;
            recv = false;
612
        } else if (options.type == ZMQ_DISH) {
613 614
            send = false;
            recv = true;
615
        } else if (options.type == ZMQ_DGRAM) {
616 617 618
            send = true;
            recv = true;
        }
619

620 621
        int rc = engine->init (addr, send, recv);
        errno_assert (rc == 0);
622

623
        send_attach (this, engine);
624

625 626
        return;
    }
627

628
#ifdef ZMQ_HAVE_OPENPGM
629

630
    //  Both PGM and EPGM transports are using the same infrastructure.
631
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
632
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
633
                    || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
634

635
        //  For EPGM transport with UDP encapsulation of PGM is used.
636
        bool const udp_encapsulation = addr->protocol == "epgm";
637 638 639 640 641 642

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
            //  PGM sender.
643 644
            pgm_sender_t *pgm_sender =
              new (std::nothrow) pgm_sender_t (io_thread, options);
645 646
            alloc_assert (pgm_sender);

647 648
            int rc =
              pgm_sender->init (udp_encapsulation, addr->address.c_str ());
649
            errno_assert (rc == 0);
650 651

            send_attach (this, pgm_sender);
652
        } else {
653
            //  PGM receiver.
654 655
            pgm_receiver_t *pgm_receiver =
              new (std::nothrow) pgm_receiver_t (io_thread, options);
656 657
            alloc_assert (pgm_receiver);

658 659
            int rc =
              pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
660
            errno_assert (rc == 0);
661 662 663 664 665 666 667

            send_attach (this, pgm_receiver);
        }

        return;
    }
#endif
Martin Hurton's avatar
Martin Hurton committed
668

bebopagogo's avatar
bebopagogo committed
669
#ifdef ZMQ_HAVE_NORM
Martin Hurton's avatar
Martin Hurton committed
670
    if (addr->protocol == "norm") {
bebopagogo's avatar
bebopagogo committed
671 672 673 674 675
        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with NORM anyway.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
            //  NORM sender.
676 677
            norm_engine_t *norm_sender =
              new (std::nothrow) norm_engine_t (io_thread, options);
bebopagogo's avatar
bebopagogo committed
678 679 680 681 682 683
            alloc_assert (norm_sender);

            int rc = norm_sender->init (addr->address.c_str (), true, false);
            errno_assert (rc == 0);

            send_attach (this, norm_sender);
684
        } else { // ZMQ_SUB or ZMQ_XSUB
bebopagogo's avatar
bebopagogo committed
685 686

            //  NORM receiver.
687 688
            norm_engine_t *norm_receiver =
              new (std::nothrow) norm_engine_t (io_thread, options);
bebopagogo's avatar
bebopagogo committed
689 690 691 692 693 694 695 696 697 698
            alloc_assert (norm_receiver);

            int rc = norm_receiver->init (addr->address.c_str (), false, true);
            errno_assert (rc == 0);

            send_attach (this, norm_receiver);
        }
        return;
    }
#endif // ZMQ_HAVE_NORM
699

Ilya Kulakov's avatar
Ilya Kulakov committed
700 701
#if defined ZMQ_HAVE_VMCI
    if (addr->protocol == "vmci") {
702 703
        vmci_connecter_t *connecter = new (std::nothrow)
          vmci_connecter_t (io_thread, this, options, addr, wait_);
Ilya Kulakov's avatar
Ilya Kulakov committed
704 705 706 707 708 709
        alloc_assert (connecter);
        launch_child (connecter);
        return;
    }
#endif

710 711
    zmq_assert (false);
}