object.cpp 10.5 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <string.h>
32
#include <stdarg.h>
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include "object.hpp"
35
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
40
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41

Martin Sustrik's avatar
Martin Sustrik committed
42
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
43
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
44
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
45 46 47
{
}

Martin Sustrik's avatar
Martin Sustrik committed
48
zmq::object_t::object_t (object_t *parent_) :
49
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
50
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
51 52 53
{
}

Martin Sustrik's avatar
Martin Sustrik committed
54
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
55 56 57
{
}

Martin Sustrik's avatar
Martin Sustrik committed
58
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
59
{
Martin Sustrik's avatar
Martin Sustrik committed
60
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
61 62
}

63 64 65 66 67
void zmq::object_t::set_tid(uint32_t id)
{
    tid = id;
}

68
zmq::ctx_t *zmq::object_t::get_ctx ()
69
{
70
    return ctx;
71 72
}

Martin Sustrik's avatar
Martin Sustrik committed
73
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
74 75 76
{
    switch (cmd_.type) {

77 78
    case command_t::activate_read:
        process_activate_read ();
79 80
        break;

81 82
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
83 84
        break;

85 86
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
87 88
        break;

89 90
    case command_t::plug:
        process_plug ();
91
        process_seqnum ();
92
        break;
Martin Sustrik's avatar
Martin Sustrik committed
93

94 95
    case command_t::own:
        process_own (cmd_.args.own.object);
96
        process_seqnum ();
97
        break;
Martin Sustrik's avatar
Martin Sustrik committed
98

99
    case command_t::attach:
100
        process_attach (cmd_.args.attach.engine);
101
        process_seqnum ();
102
        break;
103

Martin Sustrik's avatar
Martin Sustrik committed
104
    case command_t::bind:
105
        process_bind (cmd_.args.bind.pipe);
106
        process_seqnum ();
107
        break;
Martin Sustrik's avatar
Martin Sustrik committed
108

109 110 111 112
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
113 114
    case command_t::pipe_term:
        process_pipe_term ();
115
        break;
Martin Sustrik's avatar
Martin Sustrik committed
116 117 118

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
119
        break;
Martin Sustrik's avatar
Martin Sustrik committed
120

121 122 123 124
    case command_t::pipe_hwm:
        process_pipe_hwm (cmd_.args.pipe_hwm.inhwm, cmd_.args.pipe_hwm.outhwm);
        break;

125 126
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
127
        break;
128

129
    case command_t::term:
130
        process_term (cmd_.args.term.linger);
131
        break;
Martin Sustrik's avatar
Martin Sustrik committed
132

133 134
    case command_t::term_ack:
        process_term_ack ();
135
        break;
Martin Sustrik's avatar
Martin Sustrik committed
136

137 138 139 140
    case command_t::term_endpoint:
        process_term_endpoint (cmd_.args.term_endpoint.endpoint);
        break;

141 142 143 144
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

145 146 147 148
    case command_t::reaped:
        process_reaped ();
        break;

149 150 151 152
    case command_t::inproc_connected:
        process_seqnum ();
        break;

153
    case command_t::done:
Martin Sustrik's avatar
Martin Sustrik committed
154
    default:
Martin Sustrik's avatar
Martin Sustrik committed
155
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
156 157 158
    }
}

159 160
int zmq::object_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
161
{
162
    return ctx->register_endpoint (addr_, endpoint_);
163 164
}

Martin Hurton's avatar
Martin Hurton committed
165 166 167 168 169 170
int zmq::object_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    return ctx->unregister_endpoint (addr_, socket_);
}

171 172
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
173
    return ctx->unregister_endpoints (socket_);
174 175
}

176
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
177
{
178
    return ctx->find_endpoint (addr_);
179 180
}

Martin Hurton's avatar
Martin Hurton committed
181 182
void zmq::object_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
183
{
Martin Hurton's avatar
Martin Hurton committed
184
    ctx->pend_connection (addr_, endpoint_, pipes_);
185 186
}

187
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
188
{
189
    return ctx->connect_pending(addr_, bind_socket_);
190 191
}

192 193 194 195 196
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

197
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
198
{
199
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
200 201
}

Martin Sustrik's avatar
Martin Sustrik committed
202
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
203
{
204
    //  'stop' command goes always from administrative thread to
205
    //  the current object.
Martin Sustrik's avatar
Martin Sustrik committed
206 207 208
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
209
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
210 211
}

212
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
213
{
214 215
    if (inc_seqnum_)
        destination_->inc_seqnum ();
216

Martin Sustrik's avatar
Martin Sustrik committed
217 218
    command_t cmd;
    cmd.destination = destination_;
219
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
220 221 222
    send_command (cmd);
}

223
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
224
{
225
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
226 227
    command_t cmd;
    cmd.destination = destination_;
228 229
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
230 231 232
    send_command (cmd);
}

233 234
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
235
{
236 237 238
    if (inc_seqnum_)
        destination_->inc_seqnum ();

239 240 241 242 243 244 245
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

246
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
247
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
248
{
249 250 251
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
252 253
    command_t cmd;
    cmd.destination = destination_;
254
    cmd.type = command_t::bind;
255
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
256 257 258
    send_command (cmd);
}

259
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
260 261 262
{
    command_t cmd;
    cmd.destination = destination_;
263
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
264 265 266
    send_command (cmd);
}

267
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
268 269 270 271
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
272 273
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
274 275 276
    send_command (cmd);
}

277 278 279 280 281 282 283 284 285
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

286
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
287 288 289 290 291 292 293
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

294
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
295 296 297 298 299 300 301
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

302 303 304 305 306 307 308 309 310 311
void zmq::object_t::send_pipe_hwm (pipe_t *destination_, int inhwm_, int outhwm_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_hwm;
    cmd.args.pipe_hwm.inhwm = inhwm_;
    cmd.args.pipe_hwm.outhwm = outhwm_;
    send_command (cmd);
}

312 313
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
314 315 316
{
    command_t cmd;
    cmd.destination = destination_;
317 318
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
319 320 321
    send_command (cmd);
}

322
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
323 324 325
{
    command_t cmd;
    cmd.destination = destination_;
326
    cmd.type = command_t::term;
327
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
328 329 330
    send_command (cmd);
}

331
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
332 333 334
{
    command_t cmd;
    cmd.destination = destination_;
335
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
336 337 338
    send_command (cmd);
}

339 340 341 342 343 344 345 346 347 348
void zmq::object_t::send_term_endpoint (own_t *destination_,
        std::string *endpoint_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::term_endpoint;
    cmd.args.term_endpoint.endpoint = endpoint_;
    send_command (cmd);
}

349 350 351 352 353 354 355 356 357
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

358 359 360 361 362 363 364 365
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

366 367 368 369 370 371 372 373
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

374 375 376 377 378 379 380 381
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
382
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
383
{
Martin Sustrik's avatar
Martin Sustrik committed
384
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
385 386
}

387
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
388
{
Martin Sustrik's avatar
Martin Sustrik committed
389
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
390 391
}

392
void zmq::object_t::process_own (own_t *)
393 394 395 396
{
    zmq_assert (false);
}

397
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
398
{
Martin Sustrik's avatar
Martin Sustrik committed
399
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
400 401
}

402
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
403 404 405 406
{
    zmq_assert (false);
}

407
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
408
{
Martin Sustrik's avatar
Martin Sustrik committed
409
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
410 411
}

412
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
413 414 415 416
{
    zmq_assert (false);
}

417
void zmq::object_t::process_hiccup (void *)
418 419 420 421
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
422 423 424 425 426 427 428 429 430 431
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

432 433 434 435 436
void zmq::object_t::process_pipe_hwm (int, int)
{
    zmq_assert (false);
}

437
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
438
{
Martin Sustrik's avatar
Martin Sustrik committed
439
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
440 441
}

442
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
443
{
Martin Sustrik's avatar
Martin Sustrik committed
444
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
445 446
}

447
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
448
{
Martin Sustrik's avatar
Martin Sustrik committed
449
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
450 451
}

452 453 454 455 456
void zmq::object_t::process_term_endpoint (std::string *)
{
    zmq_assert (false);
}

457
void zmq::object_t::process_reap (class socket_base_t *)
458 459 460 461
{
    zmq_assert (false);
}

462 463 464 465 466
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

467 468 469 470 471
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
472
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
473
{
Martin Sustrik's avatar
Martin Sustrik committed
474
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
475 476
}