object.cpp 8.96 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <string.h>

Martin Sustrik's avatar
Martin Sustrik committed
22
#include "object.hpp"
23
#include "dispatcher.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
24
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
26 27
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
28 29 30
#include "owned.hpp"
#include "session.hpp"
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
31

32 33
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
    dispatcher (dispatcher_),
Martin Sustrik's avatar
Martin Sustrik committed
34 35 36 37
    thread_slot (thread_slot_)
{
}

Martin Sustrik's avatar
Martin Sustrik committed
38
zmq::object_t::object_t (object_t *parent_) :
39
    dispatcher (parent_->dispatcher),
Martin Sustrik's avatar
Martin Sustrik committed
40 41 42 43
    thread_slot (parent_->thread_slot)
{
}

Martin Sustrik's avatar
Martin Sustrik committed
44
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
45 46 47
{
}

Martin Sustrik's avatar
Martin Sustrik committed
48
int zmq::object_t::thread_slot_count ()
Martin Sustrik's avatar
Martin Sustrik committed
49
{
50
    return dispatcher->thread_slot_count ();
Martin Sustrik's avatar
Martin Sustrik committed
51 52
}

Martin Sustrik's avatar
Martin Sustrik committed
53
int zmq::object_t::get_thread_slot ()
Martin Sustrik's avatar
Martin Sustrik committed
54 55 56 57
{
    return thread_slot;
}

58 59 60 61 62
zmq::dispatcher_t *zmq::object_t::get_dispatcher ()
{
    return dispatcher;
}

Martin Sustrik's avatar
Martin Sustrik committed
63
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
64 65 66
{
    switch (cmd_.type) {

Martin Sustrik's avatar
Martin Sustrik committed
67 68 69 70
    case command_t::revive:
        process_revive ();
        break;

71 72
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
73 74
        break;

75 76
    case command_t::plug:
        process_plug ();
77
        process_seqnum ();
78
        return;
Martin Sustrik's avatar
Martin Sustrik committed
79

80 81
    case command_t::own:
        process_own (cmd_.args.own.object);
82
        break;
Martin Sustrik's avatar
Martin Sustrik committed
83

84
    case command_t::attach:
85
        process_attach (cmd_.args.attach.engine,
86 87
            blob_t (cmd_.args.attach.peer_identity,
            cmd_.args.attach.peer_identity_size));
88
        process_seqnum ();
89
        break;
90

Martin Sustrik's avatar
Martin Sustrik committed
91
    case command_t::bind:
92 93 94
        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
            blob_t (cmd_.args.bind.peer_identity,
            cmd_.args.bind.peer_identity_size));
95
        process_seqnum ();
96
        break;
Martin Sustrik's avatar
Martin Sustrik committed
97

Martin Hurton's avatar
Martin Hurton committed
98 99 100 101
    case command_t::reader_info:
        process_reader_info (cmd_.args.reader_info.msgs_read);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
102 103 104 105 106 107
    case command_t::pipe_term:
        process_pipe_term ();
        return;

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
108
        break;
Martin Sustrik's avatar
Martin Sustrik committed
109

110 111
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
112
        break;
113 114 115
    
    case command_t::term:
        process_term ();
116
        break;
Martin Sustrik's avatar
Martin Sustrik committed
117

118 119
    case command_t::term_ack:
        process_term_ack ();
120
        break;
Martin Sustrik's avatar
Martin Sustrik committed
121 122

    default:
Martin Sustrik's avatar
Martin Sustrik committed
123
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
124
    }
125 126 127 128

    //  The assumption here is that each command is processed once only,
    //  so deallocating it after processing is all right.
    deallocate_command (&cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
129 130
}

Martin Sustrik's avatar
Martin Sustrik committed
131 132 133 134 135 136 137 138 139 140
void zmq::object_t::register_pipe (class pipe_t *pipe_)
{
    dispatcher->register_pipe (pipe_);
}

void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
{
    dispatcher->unregister_pipe (pipe_);
}

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
    return dispatcher->register_endpoint (addr_, socket_);
}

void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
    return dispatcher->unregister_endpoints (socket_);
}

zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
{
    return dispatcher->find_endpoint (addr_);
}

Martin Sustrik's avatar
Martin Sustrik committed
156
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
Martin Sustrik's avatar
Martin Sustrik committed
157
{
158
    return dispatcher->choose_io_thread (taskset_);
Martin Sustrik's avatar
Martin Sustrik committed
159 160
}

Martin Sustrik's avatar
Martin Sustrik committed
161
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
162
{
163 164 165
    //  'stop' command goes always from administrative thread to
    //  the current object. 
    int admin_thread_id = dispatcher->thread_slot_count () - 1;
Martin Sustrik's avatar
Martin Sustrik committed
166 167 168
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
169
    dispatcher->write (admin_thread_id, thread_slot, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
170 171
}

172
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
173
{
174 175
    if (inc_seqnum_)
        destination_->inc_seqnum ();
176

Martin Sustrik's avatar
Martin Sustrik committed
177 178
    command_t cmd;
    cmd.destination = destination_;
179
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
180 181 182
    send_command (cmd);
}

183
void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
184 185 186
{
    command_t cmd;
    cmd.destination = destination_;
187 188
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
189 190 191
    send_command (cmd);
}

192
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
193
    const blob_t &peer_identity_, bool inc_seqnum_)
194
{
195 196 197
    if (inc_seqnum_)
        destination_->inc_seqnum ();

198 199 200 201
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
202
    if (peer_identity_.empty ()) {
203 204 205 206
        cmd.args.attach.peer_identity_size = 0;
        cmd.args.attach.peer_identity = NULL;
    }
    else {
207 208 209
        zmq_assert (peer_identity_.size () <= 0xff);
        cmd.args.attach.peer_identity_size =
            (unsigned char) peer_identity_.size ();
210
        cmd.args.attach.peer_identity =
211
            (unsigned char*) malloc (peer_identity_.size ());
212
        zmq_assert (cmd.args.attach.peer_identity_size);
213 214
        memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
            peer_identity_.size ());
215
    }
216 217 218
    send_command (cmd);
}

219
void zmq::object_t::send_bind (socket_base_t *destination_,
220 221
    reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
222
{
223 224 225
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
226 227
    command_t cmd;
    cmd.destination = destination_;
228
    cmd.type = command_t::bind;
Martin Sustrik's avatar
Martin Sustrik committed
229 230
    cmd.args.bind.in_pipe = in_pipe_;
    cmd.args.bind.out_pipe = out_pipe_;
231 232 233 234 235 236 237 238 239 240 241 242 243 244
    if (peer_identity_.empty ()) {
        cmd.args.bind.peer_identity_size = 0;
        cmd.args.bind.peer_identity = NULL;
    }
    else {
        zmq_assert (peer_identity_.size () <= 0xff);
        cmd.args.bind.peer_identity_size =
            (unsigned char) peer_identity_.size ();
        cmd.args.bind.peer_identity =
            (unsigned char*) malloc (peer_identity_.size ());
        zmq_assert (cmd.args.bind.peer_identity_size);
        memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
            peer_identity_.size ());
    }
Martin Sustrik's avatar
Martin Sustrik committed
245 246 247 248 249 250 251 252
    send_command (cmd);
}

void zmq::object_t::send_revive (object_t *destination_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::revive;
Martin Sustrik's avatar
Martin Sustrik committed
253 254 255
    send_command (cmd);
}

Martin Hurton's avatar
Martin Hurton committed
256 257 258 259 260 261 262 263 264 265
void zmq::object_t::send_reader_info (writer_t *destination_,
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::reader_info;
    cmd.args.reader_info.msgs_read = msgs_read_;
    send_command (cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
void zmq::object_t::send_pipe_term (writer_t *destination_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

282 283
void zmq::object_t::send_term_req (socket_base_t *destination_,
    owned_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
284 285 286
{
    command_t cmd;
    cmd.destination = destination_;
287 288
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
289 290 291
    send_command (cmd);
}

292
void zmq::object_t::send_term (owned_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
293 294 295
{
    command_t cmd;
    cmd.destination = destination_;
296
    cmd.type = command_t::term;
Martin Sustrik's avatar
Martin Sustrik committed
297 298 299
    send_command (cmd);
}

300
void zmq::object_t::send_term_ack (socket_base_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
301 302 303
{
    command_t cmd;
    cmd.destination = destination_;
304
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
305 306 307
    send_command (cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
308
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
309
{
Martin Sustrik's avatar
Martin Sustrik committed
310
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
311 312
}

313
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
314
{
Martin Sustrik's avatar
Martin Sustrik committed
315
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
316 317
}

318 319 320 321 322
void zmq::object_t::process_own (owned_t *object_)
{
    zmq_assert (false);
}

323
void zmq::object_t::process_attach (i_engine *engine_,
324
    const blob_t &peer_identity_)
Martin Sustrik's avatar
Martin Sustrik committed
325
{
Martin Sustrik's avatar
Martin Sustrik committed
326
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
327 328
}

329 330
void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
    const blob_t &peer_identity_)
Martin Sustrik's avatar
Martin Sustrik committed
331 332 333 334 335
{
    zmq_assert (false);
}

void zmq::object_t::process_revive ()
Martin Sustrik's avatar
Martin Sustrik committed
336
{
Martin Sustrik's avatar
Martin Sustrik committed
337
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
338 339
}

Martin Hurton's avatar
Martin Hurton committed
340 341 342 343 344
void zmq::object_t::process_reader_info (uint64_t msgs_read_)
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
345 346 347 348 349 350 351 352 353 354
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

355
void zmq::object_t::process_term_req (owned_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
356
{
Martin Sustrik's avatar
Martin Sustrik committed
357
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
358 359
}

360
void zmq::object_t::process_term ()
Martin Sustrik's avatar
Martin Sustrik committed
361
{
Martin Sustrik's avatar
Martin Sustrik committed
362
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
363 364
}

365
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
366
{
Martin Sustrik's avatar
Martin Sustrik committed
367
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
368 369
}

370 371 372 373 374
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
375
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
376 377
{
    int destination_thread_slot = cmd_.destination->get_thread_slot ();
378
    dispatcher->write (thread_slot, destination_thread_slot, cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
379 380
}