connect_session.cpp 5.08 KB
Newer Older
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
16

17
    You should have received a copy of the GNU Lesser General Public License
18 19 20 21 22
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "connect_session.hpp"
#include "zmq_connecter.hpp"
23 24
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
25 26 27 28 29 30

zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
      class socket_base_t *socket_, const options_t &options_,
      const char *protocol_, const char *address_) :
    session_t (io_thread_, socket_, options_),
    protocol (protocol_),
31 32
    address (address_),
    connected (false)
33 34 35 36 37
{
}

zmq::connect_session_t::~connect_session_t ()
{
38 39
    if (connected && !peer_identity.empty ())
        unregister_session (peer_identity);
40 41 42 43 44
}

void zmq::connect_session_t::process_plug ()
{
    //  Start connection process immediately.
45
    start_connecting (false);
46 47
}

48
void zmq::connect_session_t::start_connecting (bool wait_)
49
{
50 51 52 53 54
    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

55 56 57
    //  Create the connecter object.

    //  Both TCP and IPC transports are using the same infrastructure.
58 59
    if (protocol == "tcp" || protocol == "ipc") {

60
        zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
61 62
            io_thread, this, options, protocol.c_str (), address.c_str (),
            wait_);
63
        alloc_assert (connecter);
64 65 66 67 68 69 70
        launch_child (connecter);
        return;
    }

#if defined ZMQ_HAVE_OPENPGM

    //  Both PGM and EPGM transports are using the same infrastructure.
71
    if (protocol == "pgm" || protocol == "epgm") {
72 73

        //  For EPGM transport with UDP encapsulation of PGM is used.
74
        bool udp_encapsulation = (protocol == "epgm");
75 76 77 78

        //  At this point we'll create message pipes to the session straight
        //  away. There's no point in delaying it as no concept of 'connect'
        //  exists with PGM anyway.
79
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
80 81 82

            //  PGM sender.
            pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
83
                io_thread, options);
84
            alloc_assert (pgm_sender);
85

86 87
            int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
            zmq_assert (rc == 0);
88 89 90

            send_attach (this, pgm_sender, blob_t ());
        }
91
        else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
92 93 94

            //  PGM receiver.
            pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
95
                io_thread, options);
96
            alloc_assert (pgm_receiver);
97

98 99
            int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
            zmq_assert (rc == 0);
100 101 102 103 104 105 106 107 108 109 110 111 112

            send_attach (this, pgm_receiver, blob_t ());
        }
        else
            zmq_assert (false);

        return;
    }
#endif

    zmq_assert (false);
}

113
bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
114
{
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    //  If there was no previous connection...
    if (!connected) {

        //  Peer has transient identity.
        if (peer_identity_.empty () || peer_identity_ [0] == 0) {
            connected = true;
            return true;
        }

        //  Peer has strong identity. Let's register it and check whether noone
        //  else is using the same identity.
        if (!register_session (peer_identity_, this)) {
            log ("DPID: duplicate peer identity - disconnecting peer");
            return false;
        }
        connected = true;
        peer_identity = peer_identity_;
        return true;
    }

    //  New engine from listener can conflict with existing engine.
    //  Alternatively, new engine created by reconnection process can
    //  conflict with engine supplied by listener in the meantime.
    if (has_engine ()) {
        log ("DPID: duplicate peer identity - disconnecting peer");
        return false;
    }

    //  If there have been a connection before, we have to check whether
    //  peer's identity haven't changed in the meantime.
    if ((peer_identity_.empty () || peer_identity_ [0] == 0) &&
          peer_identity.empty ())
        return true;
    if (peer_identity != peer_identity_) {
        log ("CHID: peer have changed identity - disconnecting peer");
        return false;
    }
    return true;
153
}
154

155 156
void zmq::connect_session_t::detached ()
{
157
    //  Reconnect.
158
    start_connecting (true);
159 160
}