object.cpp 8.9 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include <string.h>
21
#include <stdarg.h>
22

Martin Sustrik's avatar
Martin Sustrik committed
23
#include "object.hpp"
24
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
26
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
27
#include "io_thread.hpp"
28
#include "session_base.hpp"
29
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
30

Martin Sustrik's avatar
Martin Sustrik committed
31
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
32
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
33
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
34 35 36
{
}

Martin Sustrik's avatar
Martin Sustrik committed
37
zmq::object_t::object_t (object_t *parent_) :
38
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
39
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
40 41 42
{
}

Martin Sustrik's avatar
Martin Sustrik committed
43
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
44 45 46
{
}

Martin Sustrik's avatar
Martin Sustrik committed
47
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
48
{
Martin Sustrik's avatar
Martin Sustrik committed
49
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
50 51
}

52 53 54 55 56
void zmq::object_t::set_tid(uint32_t id)
{
    tid = id;
}

57
zmq::ctx_t *zmq::object_t::get_ctx ()
58
{
59
    return ctx;
60 61
}

Martin Sustrik's avatar
Martin Sustrik committed
62
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
63 64 65
{
    switch (cmd_.type) {

66 67
    case command_t::activate_read:
        process_activate_read ();
68 69
        break;

70 71
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
72 73
        break;

74 75
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
76 77
        break;

78 79
    case command_t::plug:
        process_plug ();
80
        process_seqnum ();
81
        break;
Martin Sustrik's avatar
Martin Sustrik committed
82

83 84
    case command_t::own:
        process_own (cmd_.args.own.object);
85
        process_seqnum ();
86
        break;
Martin Sustrik's avatar
Martin Sustrik committed
87

88
    case command_t::attach:
89
        process_attach (cmd_.args.attach.engine);
90
        process_seqnum ();
91
        break;
92

Martin Sustrik's avatar
Martin Sustrik committed
93
    case command_t::bind:
94
        process_bind (cmd_.args.bind.pipe);
95
        process_seqnum ();
96
        break;
Martin Sustrik's avatar
Martin Sustrik committed
97

98 99 100 101
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
102 103
    case command_t::pipe_term:
        process_pipe_term ();
104
        break;
Martin Sustrik's avatar
Martin Sustrik committed
105 106 107

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
108
        break;
Martin Sustrik's avatar
Martin Sustrik committed
109

110 111
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
112
        break;
113 114
    
    case command_t::term:
115
        process_term (cmd_.args.term.linger);
116
        break;
Martin Sustrik's avatar
Martin Sustrik committed
117

118 119
    case command_t::term_ack:
        process_term_ack ();
120
        break;
Martin Sustrik's avatar
Martin Sustrik committed
121

122 123 124 125
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

126 127 128 129
    case command_t::reaped:
        process_reaped ();
        break;

130 131 132 133
    case command_t::inproc_connected:
        process_seqnum ();
        break;

134
    case command_t::done:
Martin Sustrik's avatar
Martin Sustrik committed
135
    default:
Martin Sustrik's avatar
Martin Sustrik committed
136
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
137 138 139
    }
}

140 141
int zmq::object_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
142
{
143
    return ctx->register_endpoint (addr_, endpoint_);
144 145
}

Martin Hurton's avatar
Martin Hurton committed
146 147 148 149 150 151
int zmq::object_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    return ctx->unregister_endpoint (addr_, socket_);
}

152 153
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
154
    return ctx->unregister_endpoints (socket_);
155 156
}

157
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
158
{
159
    return ctx->find_endpoint (addr_);
160 161
}

Martin Hurton's avatar
Martin Hurton committed
162 163
void zmq::object_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
164
{
Martin Hurton's avatar
Martin Hurton committed
165
    ctx->pend_connection (addr_, endpoint_, pipes_);
166 167
}

168
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
169
{
170
    return ctx->connect_pending(addr_, bind_socket_);
171 172
}

173 174 175 176 177
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

178
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
179
{
180
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
181 182
}

Martin Sustrik's avatar
Martin Sustrik committed
183
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
184
{
185 186
    //  'stop' command goes always from administrative thread to
    //  the current object. 
Martin Sustrik's avatar
Martin Sustrik committed
187 188 189
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
190
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
191 192
}

193
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
194
{
195 196
    if (inc_seqnum_)
        destination_->inc_seqnum ();
197

Martin Sustrik's avatar
Martin Sustrik committed
198 199
    command_t cmd;
    cmd.destination = destination_;
200
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
201 202 203
    send_command (cmd);
}

204
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
205
{
206
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
207 208
    command_t cmd;
    cmd.destination = destination_;
209 210
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
211 212 213
    send_command (cmd);
}

214 215
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
216
{
217 218 219
    if (inc_seqnum_)
        destination_->inc_seqnum ();

220 221 222 223 224 225 226
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

227
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
228
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
229
{
230 231 232
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
233 234
    command_t cmd;
    cmd.destination = destination_;
235
    cmd.type = command_t::bind;
236
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
237 238 239
    send_command (cmd);
}

240
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
241 242 243
{
    command_t cmd;
    cmd.destination = destination_;
244
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
245 246 247
    send_command (cmd);
}

248
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
249 250 251 252
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
253 254
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
255 256 257
    send_command (cmd);
}

258 259 260 261 262 263 264 265 266
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

267
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
268 269 270 271 272 273 274
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

275
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
276 277 278 279 280 281 282
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

283 284
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
285 286 287
{
    command_t cmd;
    cmd.destination = destination_;
288 289
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
290 291 292
    send_command (cmd);
}

293
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
294 295 296
{
    command_t cmd;
    cmd.destination = destination_;
297
    cmd.type = command_t::term;
298
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
299 300 301
    send_command (cmd);
}

302
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
303 304 305
{
    command_t cmd;
    cmd.destination = destination_;
306
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
307 308 309
    send_command (cmd);
}

310 311 312 313 314 315 316 317 318
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

319 320 321 322 323 324 325 326
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

327 328 329 330 331 332 333 334
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

335 336 337 338 339 340 341 342
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
343
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
344
{
Martin Sustrik's avatar
Martin Sustrik committed
345
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
346 347
}

348
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
349
{
Martin Sustrik's avatar
Martin Sustrik committed
350
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
351 352
}

353
void zmq::object_t::process_own (own_t *)
354 355 356 357
{
    zmq_assert (false);
}

358
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
359
{
Martin Sustrik's avatar
Martin Sustrik committed
360
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
361 362
}

363
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
364 365 366 367
{
    zmq_assert (false);
}

368
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
369
{
Martin Sustrik's avatar
Martin Sustrik committed
370
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
371 372
}

373
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
374 375 376 377
{
    zmq_assert (false);
}

378
void zmq::object_t::process_hiccup (void *)
379 380 381 382
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
383 384 385 386 387 388 389 390 391 392
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

393
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
394
{
Martin Sustrik's avatar
Martin Sustrik committed
395
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
396 397
}

398
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
399
{
Martin Sustrik's avatar
Martin Sustrik committed
400
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
401 402
}

403
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
404
{
Martin Sustrik's avatar
Martin Sustrik committed
405
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
406 407
}

408
void zmq::object_t::process_reap (class socket_base_t *)
409 410 411 412
{
    zmq_assert (false);
}

413 414 415 416 417
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

418 419 420 421 422
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
423
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
424
{
Martin Sustrik's avatar
Martin Sustrik committed
425
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
426 427
}