object.cpp 8.21 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3
    Copyright (c) 2007-2009 iMatix Corporation
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22
#include <string.h>
23
#include <stdarg.h>
24

Martin Sustrik's avatar
Martin Sustrik committed
25
#include "object.hpp"
26
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
27
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
28
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
29
#include "io_thread.hpp"
30
#include "session_base.hpp"
31
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
32

Martin Sustrik's avatar
Martin Sustrik committed
33
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
34
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
35
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
36 37 38
{
}

Martin Sustrik's avatar
Martin Sustrik committed
39
zmq::object_t::object_t (object_t *parent_) :
40
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
41
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
42 43 44
{
}

Martin Sustrik's avatar
Martin Sustrik committed
45
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
46 47 48
{
}

Martin Sustrik's avatar
Martin Sustrik committed
49
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
50
{
Martin Sustrik's avatar
Martin Sustrik committed
51
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
52 53
}

54
zmq::ctx_t *zmq::object_t::get_ctx ()
55
{
56
    return ctx;
57 58
}

Martin Sustrik's avatar
Martin Sustrik committed
59
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
60 61 62
{
    switch (cmd_.type) {

63 64
    case command_t::activate_read:
        process_activate_read ();
65 66
        break;

67 68
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
69 70
        break;

71 72
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
73 74
        break;

75 76
    case command_t::plug:
        process_plug ();
77
        process_seqnum ();
78
        break;
Martin Sustrik's avatar
Martin Sustrik committed
79

80 81
    case command_t::own:
        process_own (cmd_.args.own.object);
82
        process_seqnum ();
83
        break;
Martin Sustrik's avatar
Martin Sustrik committed
84

85
    case command_t::attach:
86
        process_attach (cmd_.args.attach.engine);
87
        process_seqnum ();
88
        break;
89

Martin Sustrik's avatar
Martin Sustrik committed
90
    case command_t::bind:
91
        process_bind (cmd_.args.bind.pipe);
92
        process_seqnum ();
93
        break;
Martin Sustrik's avatar
Martin Sustrik committed
94

95 96 97 98
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
99 100
    case command_t::pipe_term:
        process_pipe_term ();
101
        break;
Martin Sustrik's avatar
Martin Sustrik committed
102 103 104

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
105
        break;
Martin Sustrik's avatar
Martin Sustrik committed
106

107 108
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
109
        break;
110 111
    
    case command_t::term:
112
        process_term (cmd_.args.term.linger);
113
        break;
Martin Sustrik's avatar
Martin Sustrik committed
114

115 116
    case command_t::term_ack:
        process_term_ack ();
117
        break;
Martin Sustrik's avatar
Martin Sustrik committed
118

119 120 121 122
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

123 124 125 126
    case command_t::reaped:
        process_reaped ();
        break;

Martin Sustrik's avatar
Martin Sustrik committed
127
    default:
Martin Sustrik's avatar
Martin Sustrik committed
128
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
129 130 131
    }
}

132
int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
133
{
134
    return ctx->register_endpoint (addr_, endpoint_);
135 136 137 138
}

void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
139
    return ctx->unregister_endpoints (socket_);
140 141
}

142
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
143
{
144
    return ctx->find_endpoint (addr_);
145 146
}

147 148 149 150 151
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

152
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
153
{
154
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
155 156
}

Martin Sustrik's avatar
Martin Sustrik committed
157
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
158
{
159 160
    //  'stop' command goes always from administrative thread to
    //  the current object. 
Martin Sustrik's avatar
Martin Sustrik committed
161 162 163
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
164
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
165 166
}

167
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
168
{
169 170
    if (inc_seqnum_)
        destination_->inc_seqnum ();
171

Martin Sustrik's avatar
Martin Sustrik committed
172 173
    command_t cmd;
    cmd.destination = destination_;
174
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
175 176 177
    send_command (cmd);
}

178
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
179
{
180
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
181 182
    command_t cmd;
    cmd.destination = destination_;
183 184
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
185 186 187
    send_command (cmd);
}

188 189
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
190
{
191 192 193
    if (inc_seqnum_)
        destination_->inc_seqnum ();

194 195 196 197 198 199 200
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

201
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
202
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
203
{
204 205 206
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
207 208
    command_t cmd;
    cmd.destination = destination_;
209
    cmd.type = command_t::bind;
210
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
211 212 213
    send_command (cmd);
}

214
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
215 216 217
{
    command_t cmd;
    cmd.destination = destination_;
218
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
219 220 221
    send_command (cmd);
}

222
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
223 224 225 226
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
227 228
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
229 230 231
    send_command (cmd);
}

232 233 234 235 236 237 238 239 240
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

241
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
242 243 244 245 246 247 248
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

249
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
250 251 252 253 254 255 256
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

257 258
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
259 260 261
{
    command_t cmd;
    cmd.destination = destination_;
262 263
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
264 265 266
    send_command (cmd);
}

267
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
268 269 270
{
    command_t cmd;
    cmd.destination = destination_;
271
    cmd.type = command_t::term;
272
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
273 274 275
    send_command (cmd);
}

276
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
277 278 279
{
    command_t cmd;
    cmd.destination = destination_;
280
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
281 282 283
    send_command (cmd);
}

284 285 286 287 288 289 290 291 292
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

293 294 295 296 297 298 299 300
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

301 302 303 304 305 306 307 308
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
309
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
310
{
Martin Sustrik's avatar
Martin Sustrik committed
311
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
312 313
}

314
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
315
{
Martin Sustrik's avatar
Martin Sustrik committed
316
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
317 318
}

319
void zmq::object_t::process_own (own_t *object_)
320 321 322 323
{
    zmq_assert (false);
}

324
void zmq::object_t::process_attach (i_engine *engine_)
Martin Sustrik's avatar
Martin Sustrik committed
325
{
Martin Sustrik's avatar
Martin Sustrik committed
326
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
327 328
}

329
void zmq::object_t::process_bind (pipe_t *pipe_)
Martin Sustrik's avatar
Martin Sustrik committed
330 331 332 333
{
    zmq_assert (false);
}

334
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
335
{
Martin Sustrik's avatar
Martin Sustrik committed
336
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
337 338
}

339
void zmq::object_t::process_activate_write (uint64_t msgs_read_)
Martin Hurton's avatar
Martin Hurton committed
340 341 342 343
{
    zmq_assert (false);
}

344 345 346 347 348
void zmq::object_t::process_hiccup (void *pipe_)
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
349 350 351 352 353 354 355 356 357 358
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

359
void zmq::object_t::process_term_req (own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
360
{
Martin Sustrik's avatar
Martin Sustrik committed
361
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
362 363
}

364
void zmq::object_t::process_term (int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
365
{
Martin Sustrik's avatar
Martin Sustrik committed
366
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
367 368
}

369
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
370
{
Martin Sustrik's avatar
Martin Sustrik committed
371
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
372 373
}

374 375 376 377 378
void zmq::object_t::process_reap (class socket_base_t *socket_)
{
    zmq_assert (false);
}

379 380 381 382 383
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

384 385 386 387 388
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
389
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
390
{
Martin Sustrik's avatar
Martin Sustrik committed
391
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
392 393
}