session.cpp 6.76 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <new>

22
#include "session.hpp"
23
#include "i_engine.hpp"
24
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25
#include "pipe.hpp"
26

27
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
28
      const options_t &options_) :
29
    owned_t (parent_, owner_),
Martin Sustrik's avatar
Martin Sustrik committed
30
    in_pipe (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
31
    active (true),
Martin Sustrik's avatar
Martin Sustrik committed
32
    out_pipe (NULL),
33
    engine (NULL),
34
    options (options_)
35
{    
36 37 38 39 40 41 42
    //  It's possible to register the session at this point as it will be
    //  searched for only on reconnect, i.e. no race condition (session found
    //  before it is plugged into it's I/O thread) is possible.
    ordinal = owner->register_session (this);
}

zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
43
      const options_t &options_, const blob_t &peer_identity_) :
44 45 46 47 48
    owned_t (parent_, owner_),
    in_pipe (NULL),
    active (true),
    out_pipe (NULL),
    engine (NULL),
49
    ordinal (0),
50
    peer_identity (peer_identity_),
51 52
    options (options_)
{
53
    if (!peer_identity.empty () && peer_identity [0] != 0) {
54
        if (!owner->register_session (peer_identity, this)) {
55 56 57 58 59 60

            //  TODO: There's already a session with the specified
            //  identity. We should presumably syslog it and drop the
            //  session.
            zmq_assert (false);
        }
61
    }
62 63 64 65
}

zmq::session_t::~session_t ()
{
66 67
    zmq_assert (!in_pipe);
    zmq_assert (!out_pipe);
68 69
}

70
bool zmq::session_t::read (::zmq_msg_t *msg_)
71
{
72
    if (!in_pipe || !active)
Martin Sustrik's avatar
Martin Sustrik committed
73 74
        return false;

75
    return in_pipe->read (msg_);
76 77
}

78
bool zmq::session_t::write (::zmq_msg_t *msg_)
79
{
80
    if (out_pipe && out_pipe->write (msg_)) {
81 82 83 84 85
        zmq_msg_init (msg_);
        return true;
    }

    return false;
86 87 88 89
}

void zmq::session_t::flush ()
{
90 91
    if (out_pipe)
        out_pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
92 93
}

94
void zmq::session_t::detach (owned_t *reconnecter_)
Martin Sustrik's avatar
Martin Sustrik committed
95
{
96 97 98 99 100
    //  Plug in the reconnecter object if any.
    if (reconnecter_) {
        send_plug (reconnecter_);
        send_own (owner, reconnecter_);
    }
101 102

    //  Engine is terminating itself. No need to deallocate it from here.
Martin Sustrik's avatar
Martin Sustrik committed
103 104
    engine = NULL;

105
    //  Terminate transient session.
106
    if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
107
        term ();
Martin Sustrik's avatar
Martin Sustrik committed
108 109
}

110 111 112 113 114 115 116 117 118 119
zmq::io_thread_t *zmq::session_t::get_io_thread ()
{
    return choose_io_thread (options.affinity);
}

class zmq::socket_base_t *zmq::session_t::get_owner ()
{
    return owner;
}

120
uint64_t zmq::session_t::get_ordinal ()
121
{
122 123
    zmq_assert (ordinal);
    return ordinal;
124 125
}

126
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
127
    class writer_t *outpipe_, const blob_t &peer_identity_)
128
{
129 130 131 132 133 134 135 136 137 138 139 140
    if (inpipe_) {
        zmq_assert (!in_pipe);
        in_pipe = inpipe_;
        active = true;
        in_pipe->set_endpoint (this);
    }

    if (outpipe_) {
        zmq_assert (!out_pipe);
        out_pipe = outpipe_;
        out_pipe->set_endpoint (this);
    }
141
}
142

143
void zmq::session_t::detach_inpipe (reader_t *pipe_)
144
{
145 146
    active = false;
    in_pipe = NULL;
147 148
}

149
void zmq::session_t::detach_outpipe (writer_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
150
{
151
    out_pipe = NULL;
152 153
}

154
void zmq::session_t::kill (reader_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
155 156 157 158
{
    active = false;
}

159
void zmq::session_t::revive (reader_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
160
{
161 162 163 164
    zmq_assert (in_pipe == pipe_);
    active = true;
    if (engine)
        engine->revive ();
Martin Sustrik's avatar
Martin Sustrik committed
165 166
}

167 168
void zmq::session_t::process_plug ()
{
169 170 171 172 173 174 175
}

void zmq::session_t::process_unplug ()
{
    //  Unregister the session from the socket.
    if (ordinal)
        owner->unregister_session (ordinal);
176
    else if (!peer_identity.empty () && peer_identity [0] != 0)
177
        owner->unregister_session (peer_identity);
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196

    //  Ask associated pipes to terminate.
    if (in_pipe) {
        in_pipe->term ();
        in_pipe = NULL;
    }
    if (out_pipe) {
        out_pipe->term ();
        out_pipe = NULL;
    }

    if (engine) {
        engine->unplug ();
        delete engine;
        engine = NULL;
    }
}

void zmq::session_t::process_attach (i_engine *engine_,
197
    const blob_t &peer_identity_)
198
{
199
    if (!peer_identity.empty ()) {
200

201 202 203 204 205 206 207 208 209
        //  If both IDs are temporary, no checking is needed.
        //  TODO: Old ID should be reused in this case...
        if (peer_identity.empty () || peer_identity [0] != 0 ||
            peer_identity_.empty () || peer_identity_ [0] != 0) {

            //  If we already know the peer name do nothing, just check whether
            //  it haven't changed.
            zmq_assert (peer_identity == peer_identity_);
        }
210
    }
211
    else if (!peer_identity_.empty ()) {
212

213 214
        //  Store the peer identity.
        peer_identity = peer_identity_;
215 216 217 218

        //  If the session is not registered with the ordinal, let's register
        //  it using the peer name.
        if (!ordinal) {
219
            if (!owner->register_session (peer_identity, this)) {
220 221 222 223 224 225 226

                //  TODO: There's already a session with the specified
                //  identity. We should presumably syslog it and drop the
                //  session.
                zmq_assert (false);
            }
        }
227
    }
228

229 230 231 232 233 234 235 236 237 238 239 240 241
    //  Check whether the required pipes already exist. If not so, we'll
    //  create them and bind them to the socket object.
    reader_t *socket_reader = NULL;
    writer_t *socket_writer = NULL;

    if (options.requires_in && !out_pipe) {
        pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
            options.hwm, options.lwm);
        zmq_assert (pipe);
        out_pipe = &pipe->writer;
        out_pipe->set_endpoint (this);
        socket_reader = &pipe->reader;
    }
242

243 244 245 246 247 248 249
    if (options.requires_out && !in_pipe) {
        pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
            options.hwm, options.lwm);
        zmq_assert (pipe);
        in_pipe = &pipe->reader;
        in_pipe->set_endpoint (this);
        socket_writer = &pipe->writer;
Martin Sustrik's avatar
Martin Sustrik committed
250
    }
251

252 253 254
    if (socket_reader || socket_writer)
        send_bind (owner, socket_reader, socket_writer, peer_identity);

255
    //  Plug in the engine.
256
    zmq_assert (!engine);
257 258 259
    zmq_assert (engine_);
    engine = engine_;
    engine->plug (this);
260 261 262 263 264

    //  Once the initial handshaking is over tracerouting should trim prefixes
    //  from outbound messages.
    if (options.traceroute)
        engine->trim_prefix ();
265
}