session.cpp 5.66 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <new>

22
#include "session.hpp"
23
#include "i_engine.hpp"
24
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25
#include "pipe.hpp"
26

27
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
28
      const options_t &options_) :
29
    owned_t (parent_, owner_),
Martin Sustrik's avatar
Martin Sustrik committed
30
    in_pipe (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
31
    active (true),
Martin Sustrik's avatar
Martin Sustrik committed
32
    out_pipe (NULL),
33
    engine (NULL),
34
    options (options_)
35
{
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
    type = unnamed;
    
    //  It's possible to register the session at this point as it will be
    //  searched for only on reconnect, i.e. no race condition (session found
    //  before it is plugged into it's I/O thread) is possible.
    ordinal = owner->register_session (this);
}

zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
      const options_t &options_, const char *name_) :
    owned_t (parent_, owner_),
    in_pipe (NULL),
    active (true),
    out_pipe (NULL),
    engine (NULL),
    options (options_)
{
    if (name_) {
        type = named;
        name = name_;
        ordinal = 0;
    }
    else {
        type = transient;
        //  TODO: Generate unique name here.
        ordinal = 0;
    }
63 64 65 66
}

zmq::session_t::~session_t ()
{
67 68
    zmq_assert (!in_pipe);
    zmq_assert (!out_pipe);
69 70
}

71
bool zmq::session_t::read (::zmq_msg_t *msg_)
72
{
73
    if (!in_pipe || !active)
Martin Sustrik's avatar
Martin Sustrik committed
74 75
        return false;

76
    return in_pipe->read (msg_);
77 78
}

79
bool zmq::session_t::write (::zmq_msg_t *msg_)
80
{
81
    if (out_pipe && out_pipe->write (msg_)) {
82 83 84 85 86
        zmq_msg_init (msg_);
        return true;
    }

    return false;
87 88 89 90
}

void zmq::session_t::flush ()
{
91 92
    if (out_pipe)
        out_pipe->flush ();
Martin Sustrik's avatar
Martin Sustrik committed
93 94
}

95
void zmq::session_t::detach (owned_t *reconnecter_)
Martin Sustrik's avatar
Martin Sustrik committed
96
{
97 98 99 100 101
    //  Plug in the reconnecter object if any.
    if (reconnecter_) {
        send_plug (reconnecter_);
        send_own (owner, reconnecter_);
    }
102 103

    //  Engine is terminating itself. No need to deallocate it from here.
Martin Sustrik's avatar
Martin Sustrik committed
104 105
    engine = NULL;

106 107
    //  Terminate transient session.
    if (type == transient)
108
        term ();
Martin Sustrik's avatar
Martin Sustrik committed
109 110
}

111 112 113 114 115 116 117 118 119 120
zmq::io_thread_t *zmq::session_t::get_io_thread ()
{
    return choose_io_thread (options.affinity);
}

class zmq::socket_base_t *zmq::session_t::get_owner ()
{
    return owner;
}

121
uint64_t zmq::session_t::get_ordinal ()
122
{
123 124 125
    zmq_assert (type == unnamed);
    zmq_assert (ordinal);
    return ordinal;
126 127
}

128 129
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
    class writer_t *outpipe_)
130
{
131 132 133 134 135 136 137 138 139 140 141 142
    if (inpipe_) {
        zmq_assert (!in_pipe);
        in_pipe = inpipe_;
        active = true;
        in_pipe->set_endpoint (this);
    }

    if (outpipe_) {
        zmq_assert (!out_pipe);
        out_pipe = outpipe_;
        out_pipe->set_endpoint (this);
    }
143
}
144

145
void zmq::session_t::detach_inpipe (reader_t *pipe_)
146
{
147 148
    active = false;
    in_pipe = NULL;
149 150
}

151
void zmq::session_t::detach_outpipe (writer_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
152
{
153
    out_pipe = NULL;
154 155
}

156
void zmq::session_t::kill (reader_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
157 158 159 160
{
    active = false;
}

161
void zmq::session_t::revive (reader_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
162
{
163 164 165 166
    zmq_assert (in_pipe == pipe_);
    active = true;
    if (engine)
        engine->revive ();
Martin Sustrik's avatar
Martin Sustrik committed
167 168
}

169 170
void zmq::session_t::process_plug ()
{
171
    //  Register the session with the socket.
172 173
    if (!name.empty ()) {
        bool ok = owner->register_session (name.c_str (), this);
174

175 176 177 178
        //  There's already a session with the specified identity.
        //  We should syslog it and drop the session. TODO
        zmq_assert (ok);
    }
179

Martin Sustrik's avatar
Martin Sustrik committed
180 181 182 183
    //  If session is created by 'connect' function, it has the pipes set
    //  already. Otherwise, it's being created by the listener and the pipes
    //  are yet to be created.
    if (!in_pipe && !out_pipe) {
184 185 186 187 188

        pipe_t *inbound = NULL;
        pipe_t *outbound = NULL;

        if (options.requires_out) {
189 190
            inbound = new (std::nothrow) pipe_t (this, owner,
                options.hwm, options.lwm);
191 192 193 194 195 196
            zmq_assert (inbound);
            in_pipe = &inbound->reader;
            in_pipe->set_endpoint (this);
        }

        if (options.requires_in) {
197 198
            outbound = new (std::nothrow) pipe_t (owner, this,
                options.hwm, options.lwm);
199 200 201 202 203
            zmq_assert (outbound);
            out_pipe = &outbound->writer;
            out_pipe->set_endpoint (this);
        }

204
        send_bind (owner, outbound ? &outbound->reader : NULL,
205
            inbound ? &inbound->writer : NULL);
Martin Sustrik's avatar
Martin Sustrik committed
206
    }
207 208 209 210
}

void zmq::session_t::process_unplug ()
{
211 212 213 214 215 216
    //  Unregister the session from the socket. There's nothing to do here
    //  for transient sessions.
    if (type == unnamed)
        owner->unregister_session (ordinal);
    else if (type == named)
        owner->unregister_session (name.c_str ());
217

218 219 220 221 222 223 224 225 226 227
    //  Ask associated pipes to terminate.
    if (in_pipe) {
        in_pipe->term ();
        in_pipe = NULL;
    }
    if (out_pipe) {
        out_pipe->term ();
        out_pipe = NULL;
    }

228 229 230 231 232 233 234
    if (engine) {
        engine->unplug ();
        delete engine;
        engine = NULL;
    }
}

235
void zmq::session_t::process_attach (i_engine *engine_)
236
{
237
    zmq_assert (!engine);
238 239 240
    zmq_assert (engine_);
    engine = engine_;
    engine->plug (this);
241
}