object.cpp 8.72 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include <string.h>
21
#include <stdarg.h>
22

Martin Sustrik's avatar
Martin Sustrik committed
23
#include "object.hpp"
24
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
26
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
27
#include "io_thread.hpp"
28
#include "session_base.hpp"
29
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
30

Martin Sustrik's avatar
Martin Sustrik committed
31
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
32
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
33
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
34 35 36
{
}

Martin Sustrik's avatar
Martin Sustrik committed
37
zmq::object_t::object_t (object_t *parent_) :
38
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
39
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
40 41 42
{
}

Martin Sustrik's avatar
Martin Sustrik committed
43
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
44 45 46
{
}

Martin Sustrik's avatar
Martin Sustrik committed
47
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
48
{
Martin Sustrik's avatar
Martin Sustrik committed
49
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
50 51
}

52 53 54 55 56
void zmq::object_t::set_tid(uint32_t id)
{
    tid = id;
}

57
zmq::ctx_t *zmq::object_t::get_ctx ()
58
{
59
    return ctx;
60 61
}

Martin Sustrik's avatar
Martin Sustrik committed
62
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
63 64 65
{
    switch (cmd_.type) {

66 67
    case command_t::activate_read:
        process_activate_read ();
68 69
        break;

70 71
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
72 73
        break;

74 75
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
76 77
        break;

78 79
    case command_t::plug:
        process_plug ();
80
        process_seqnum ();
81
        break;
Martin Sustrik's avatar
Martin Sustrik committed
82

83 84
    case command_t::own:
        process_own (cmd_.args.own.object);
85
        process_seqnum ();
86
        break;
Martin Sustrik's avatar
Martin Sustrik committed
87

88
    case command_t::attach:
89
        process_attach (cmd_.args.attach.engine);
90
        process_seqnum ();
91
        break;
92

Martin Sustrik's avatar
Martin Sustrik committed
93
    case command_t::bind:
94
        process_bind (cmd_.args.bind.pipe);
95
        process_seqnum ();
96
        break;
Martin Sustrik's avatar
Martin Sustrik committed
97

98 99 100 101
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
102 103
    case command_t::pipe_term:
        process_pipe_term ();
104
        break;
Martin Sustrik's avatar
Martin Sustrik committed
105 106 107

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
108
        break;
Martin Sustrik's avatar
Martin Sustrik committed
109

110 111
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
112
        break;
113 114
    
    case command_t::term:
115
        process_term (cmd_.args.term.linger);
116
        break;
Martin Sustrik's avatar
Martin Sustrik committed
117

118 119
    case command_t::term_ack:
        process_term_ack ();
120
        break;
Martin Sustrik's avatar
Martin Sustrik committed
121

122 123 124 125
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

126 127 128 129
    case command_t::reaped:
        process_reaped ();
        break;

130 131 132 133
    case command_t::inproc_connected:
        process_seqnum ();
        break;

134
    case command_t::done:
Martin Sustrik's avatar
Martin Sustrik committed
135
    default:
Martin Sustrik's avatar
Martin Sustrik committed
136
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
137 138 139
    }
}

140
int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
141
{
142
    return ctx->register_endpoint (addr_, endpoint_);
143 144 145 146
}

void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
147
    return ctx->unregister_endpoints (socket_);
148 149
}

150
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
151
{
152
    return ctx->find_endpoint (addr_);
153 154
}

155
void zmq::object_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
156 157 158 159
{
    ctx->pend_connection (addr_, pending_connection_);
}

160
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
161
{
162
    return ctx->connect_pending(addr_, bind_socket_);
163 164
}

165 166 167 168 169
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

170
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
171
{
172
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
173 174
}

Martin Sustrik's avatar
Martin Sustrik committed
175
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
176
{
177 178
    //  'stop' command goes always from administrative thread to
    //  the current object. 
Martin Sustrik's avatar
Martin Sustrik committed
179 180 181
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
182
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
183 184
}

185
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
186
{
187 188
    if (inc_seqnum_)
        destination_->inc_seqnum ();
189

Martin Sustrik's avatar
Martin Sustrik committed
190 191
    command_t cmd;
    cmd.destination = destination_;
192
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
193 194 195
    send_command (cmd);
}

196
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
197
{
198
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
199 200
    command_t cmd;
    cmd.destination = destination_;
201 202
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
203 204 205
    send_command (cmd);
}

206 207
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
208
{
209 210 211
    if (inc_seqnum_)
        destination_->inc_seqnum ();

212 213 214 215 216 217 218
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

219
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
220
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
221
{
222 223 224
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
225 226
    command_t cmd;
    cmd.destination = destination_;
227
    cmd.type = command_t::bind;
228
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
229 230 231
    send_command (cmd);
}

232
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
233 234 235
{
    command_t cmd;
    cmd.destination = destination_;
236
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
237 238 239
    send_command (cmd);
}

240
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
241 242 243 244
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
245 246
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
247 248 249
    send_command (cmd);
}

250 251 252 253 254 255 256 257 258
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

259
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
260 261 262 263 264 265 266
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

267
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
268 269 270 271 272 273 274
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

275 276
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
277 278 279
{
    command_t cmd;
    cmd.destination = destination_;
280 281
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
282 283 284
    send_command (cmd);
}

285
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
286 287 288
{
    command_t cmd;
    cmd.destination = destination_;
289
    cmd.type = command_t::term;
290
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
291 292 293
    send_command (cmd);
}

294
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
295 296 297
{
    command_t cmd;
    cmd.destination = destination_;
298
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
299 300 301
    send_command (cmd);
}

302 303 304 305 306 307 308 309 310
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

311 312 313 314 315 316 317 318
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

319 320 321 322 323 324 325 326
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

327 328 329 330 331 332 333 334
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
335
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
336
{
Martin Sustrik's avatar
Martin Sustrik committed
337
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
338 339
}

340
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
341
{
Martin Sustrik's avatar
Martin Sustrik committed
342
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
343 344
}

345
void zmq::object_t::process_own (own_t *)
346 347 348 349
{
    zmq_assert (false);
}

350
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
351
{
Martin Sustrik's avatar
Martin Sustrik committed
352
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
353 354
}

355
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
356 357 358 359
{
    zmq_assert (false);
}

360
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
361
{
Martin Sustrik's avatar
Martin Sustrik committed
362
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
363 364
}

365
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
366 367 368 369
{
    zmq_assert (false);
}

370
void zmq::object_t::process_hiccup (void *)
371 372 373 374
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
375 376 377 378 379 380 381 382 383 384
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

385
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
386
{
Martin Sustrik's avatar
Martin Sustrik committed
387
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
388 389
}

390
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
391
{
Martin Sustrik's avatar
Martin Sustrik committed
392
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
393 394
}

395
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
396
{
Martin Sustrik's avatar
Martin Sustrik committed
397
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
398 399
}

400
void zmq::object_t::process_reap (class socket_base_t *)
401 402 403 404
{
    zmq_assert (false);
}

405 406 407 408 409
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

410 411 412 413 414
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
415
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
416
{
Martin Sustrik's avatar
Martin Sustrik committed
417
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
418 419
}