zmq_init.cpp 6.43 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <string.h>

22
#include "zmq_init.hpp"
23 24 25
#include "transient_session.hpp"
#include "named_session.hpp"
#include "socket_base.hpp"
26 27 28
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
29
#include "uuid.hpp"
30
#include "blob.hpp"
31 32
#include "err.hpp"

33 34 35
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
      socket_base_t *socket_, session_t *session_, fd_t fd_,
      const options_t &options_) :
36
    own_t (io_thread_, options_),
37
    ephemeral_engine (NULL),
38 39
    sent (false),
    received (false),
40 41 42
    socket (socket_),
    session (session_),
    io_thread (io_thread_)
43 44
{
    //  Create the engine object for this connection.
45
    engine = new (std::nothrow) zmq_engine_t (fd_, options);
46 47 48 49 50 51
    zmq_assert (engine);
}

zmq::zmq_init_t::~zmq_init_t ()
{
    if (engine)
52
        engine->terminate ();
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
}

bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
    //  If the identity was already sent, do nothing.
    if (sent)
        return false;

    //  Send the identity.
    int rc = zmq_msg_init_size (msg_, options.identity.size ());
    zmq_assert (rc == 0);
    memcpy (zmq_msg_data (msg_), options.identity.c_str (),
        options.identity.size ());
    sent = true;

68
    //  Try finalize initialization.
69
    finalise_initialisation ();
70 71 72 73 74 75 76 77 78 79 80

    return true;
}

bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
{
    //  If identity was already received, we are not interested
    //  in subsequent messages.
    if (received)
        return false;

81 82
    //  Retreieve the remote identity. If it's empty, generate a unique name.
    if (!zmq_msg_size (msg_)) {
83
        unsigned char identity [uuid_t::uuid_blob_len + 1];
84
        identity [0] = 0;
85 86
        memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
        peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
87 88 89 90 91
    }
    else {
        peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
            zmq_msg_size (msg_));
    }
92

93 94
    received = true;

95 96 97
    //  Try finalize initialization.
    finalise_initialisation ();

98 99 100 101 102 103 104 105 106
    return true;
}

void zmq::zmq_init_t::flush ()
{
    //  Check if there's anything to flush.
    if (!received)
        return;

107 108 109
    //  Initialization is done, dispatch engine.
    if (ephemeral_engine)
        dispatch_engine ();
110 111
}

112
void zmq::zmq_init_t::detach ()
113 114 115
{
    //  This function is called by engine when disconnection occurs.

116 117 118 119
    //  If there is an associated session, send it a null engine to let it know
    //  that connection process was unsuccesful.
    if (session)
        send_attach (session, NULL, blob_t (), true);
120 121 122 123

    //  The engine will destroy itself, so let's just drop the pointer here and
    //  start termination of the init object.
    engine = NULL;
124
    terminate ();
125 126 127 128 129
}

void zmq::zmq_init_t::process_plug ()
{
    zmq_assert (engine);
130
    engine->plug (io_thread, this);
131 132 133 134 135 136 137 138
}

void zmq::zmq_init_t::process_unplug ()
{
    if (engine)
        engine->unplug ();
}

139
void zmq::zmq_init_t::finalise_initialisation ()
140 141 142 143 144 145 146 147 148 149 150
{
     //  Unplug and prepare to dispatch engine.
     if (sent && received) {
        ephemeral_engine = engine;
        engine = NULL;
        ephemeral_engine->unplug ();
        return;
    }
}

void zmq::zmq_init_t::dispatch_engine ()
151 152 153
{
    if (sent && received) {

154 155 156 157
        //  Engine must be detached.
        zmq_assert (!engine);
        zmq_assert (ephemeral_engine);

158
        //  If we know what session we belong to, it's easy, just send the
159 160 161 162
        //  engine to that session and destroy the init object. Note that we
        //  know about the session only if this object is owned by it. Thus,
        //  lifetime of this object in contained in the lifetime of the session
        //  so the pointer cannot become invalid without notice.
163
        if (session) {
164
            send_attach (session, ephemeral_engine, peer_identity, true);
165 166 167
            terminate ();
            return;
        }
168

169 170 171 172 173
        //  All the cases below are listener-based. Therefore we need the socket
        //  reference so that new sessions can bind to that socket.
        zmq_assert (socket);

        //  We have no associated session. If the peer has no identity we'll
174 175 176 177
        //  create a transient session for the connection. Note that
        //  seqnum is incremented to account for attach command before the
        //  session is launched. That way we are sure it won't terminate before
        //  being attached.
178 179 180 181
        if (peer_identity [0] == 0) {
            session = new (std::nothrow) transient_session_t (io_thread,
                socket, options);
            zmq_assert (session);
182
            session->inc_seqnum ();
183
            launch_sibling (session);
184
            send_attach (session, ephemeral_engine, peer_identity, false);
185 186
            terminate ();
            return;
187
        }
188 189 190 191 192 193 194
        
        //  Try to find the session corresponding to the peer's identity.
        //  If found, send the engine to that session and destroy this object.
        //  Note that session's seqnum is incremented by find_session rather
        //  than by send_attach.
        session = socket->find_session (peer_identity);
        if (session) {
195
            send_attach (session, ephemeral_engine, peer_identity, false);
196 197
            terminate ();
            return;
198 199
        }

200 201 202 203
        //  There's no such named session. We have to create one. Note that
        //  seqnum is incremented to account for attach command before the
        //  session is launched. That way we are sure it won't terminate before
        //  being attached.
204 205
        session = new (std::nothrow) named_session_t (io_thread, socket,
            options, peer_identity);
206
        zmq_assert (session);
207
        session->inc_seqnum ();
208
        launch_sibling (session);
209
        send_attach (session, ephemeral_engine, peer_identity, false);
210 211
        terminate ();
        return;
212 213
    }
}