object.cpp 10 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <string.h>
32
#include <stdarg.h>
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include "object.hpp"
35
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
40
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41

Martin Sustrik's avatar
Martin Sustrik committed
42
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
43
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
44
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
45 46 47
{
}

Martin Sustrik's avatar
Martin Sustrik committed
48
zmq::object_t::object_t (object_t *parent_) :
49
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
50
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
51 52 53
{
}

Martin Sustrik's avatar
Martin Sustrik committed
54
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
55 56 57
{
}

Martin Sustrik's avatar
Martin Sustrik committed
58
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
59
{
Martin Sustrik's avatar
Martin Sustrik committed
60
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
61 62
}

63 64 65 66 67
void zmq::object_t::set_tid(uint32_t id)
{
    tid = id;
}

68
zmq::ctx_t *zmq::object_t::get_ctx ()
69
{
70
    return ctx;
71 72
}

Martin Sustrik's avatar
Martin Sustrik committed
73
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
74 75 76
{
    switch (cmd_.type) {

77 78
    case command_t::activate_read:
        process_activate_read ();
79 80
        break;

81 82
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
83 84
        break;

85 86
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
87 88
        break;

89 90
    case command_t::plug:
        process_plug ();
91
        process_seqnum ();
92
        break;
Martin Sustrik's avatar
Martin Sustrik committed
93

94 95
    case command_t::own:
        process_own (cmd_.args.own.object);
96
        process_seqnum ();
97
        break;
Martin Sustrik's avatar
Martin Sustrik committed
98

99
    case command_t::attach:
100
        process_attach (cmd_.args.attach.engine);
101
        process_seqnum ();
102
        break;
103

Martin Sustrik's avatar
Martin Sustrik committed
104
    case command_t::bind:
105
        process_bind (cmd_.args.bind.pipe);
106
        process_seqnum ();
107
        break;
Martin Sustrik's avatar
Martin Sustrik committed
108

109 110 111 112
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
113 114
    case command_t::pipe_term:
        process_pipe_term ();
115
        break;
Martin Sustrik's avatar
Martin Sustrik committed
116 117 118

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
119
        break;
Martin Sustrik's avatar
Martin Sustrik committed
120

121 122 123 124
    case command_t::pipe_hwm:
        process_pipe_hwm (cmd_.args.pipe_hwm.inhwm, cmd_.args.pipe_hwm.outhwm);
        break;

125 126
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
127
        break;
128

129
    case command_t::term:
130
        process_term (cmd_.args.term.linger);
131
        break;
Martin Sustrik's avatar
Martin Sustrik committed
132

133 134
    case command_t::term_ack:
        process_term_ack ();
135
        break;
Martin Sustrik's avatar
Martin Sustrik committed
136

137 138 139 140
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

141 142 143 144
    case command_t::reaped:
        process_reaped ();
        break;

145 146 147 148
    case command_t::inproc_connected:
        process_seqnum ();
        break;

149
    case command_t::done:
Martin Sustrik's avatar
Martin Sustrik committed
150
    default:
Martin Sustrik's avatar
Martin Sustrik committed
151
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
152 153 154
    }
}

155 156
int zmq::object_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
157
{
158
    return ctx->register_endpoint (addr_, endpoint_);
159 160
}

Martin Hurton's avatar
Martin Hurton committed
161 162 163 164 165 166
int zmq::object_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    return ctx->unregister_endpoint (addr_, socket_);
}

167 168
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
169
    return ctx->unregister_endpoints (socket_);
170 171
}

172
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
173
{
174
    return ctx->find_endpoint (addr_);
175 176
}

Martin Hurton's avatar
Martin Hurton committed
177 178
void zmq::object_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
179
{
Martin Hurton's avatar
Martin Hurton committed
180
    ctx->pend_connection (addr_, endpoint_, pipes_);
181 182
}

183
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
184
{
185
    return ctx->connect_pending(addr_, bind_socket_);
186 187
}

188 189 190 191 192
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

193
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
194
{
195
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
196 197
}

Martin Sustrik's avatar
Martin Sustrik committed
198
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
199
{
200
    //  'stop' command goes always from administrative thread to
201
    //  the current object.
Martin Sustrik's avatar
Martin Sustrik committed
202 203 204
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
205
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
206 207
}

208
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
209
{
210 211
    if (inc_seqnum_)
        destination_->inc_seqnum ();
212

Martin Sustrik's avatar
Martin Sustrik committed
213 214
    command_t cmd;
    cmd.destination = destination_;
215
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
216 217 218
    send_command (cmd);
}

219
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
220
{
221
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
222 223
    command_t cmd;
    cmd.destination = destination_;
224 225
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
226 227 228
    send_command (cmd);
}

229 230
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
231
{
232 233 234
    if (inc_seqnum_)
        destination_->inc_seqnum ();

235 236 237 238 239 240 241
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

242
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
243
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
244
{
245 246 247
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
248 249
    command_t cmd;
    cmd.destination = destination_;
250
    cmd.type = command_t::bind;
251
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
252 253 254
    send_command (cmd);
}

255
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
256 257 258
{
    command_t cmd;
    cmd.destination = destination_;
259
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
260 261 262
    send_command (cmd);
}

263
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
264 265 266 267
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
268 269
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
270 271 272
    send_command (cmd);
}

273 274 275 276 277 278 279 280 281
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

282
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
283 284 285 286 287 288 289
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

290
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
291 292 293 294 295 296 297
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

298 299 300 301 302 303 304 305 306 307
void zmq::object_t::send_pipe_hwm (pipe_t *destination_, int inhwm_, int outhwm_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_hwm;
    cmd.args.pipe_hwm.inhwm = inhwm_;
    cmd.args.pipe_hwm.outhwm = outhwm_;
    send_command (cmd);
}

308 309
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
310 311 312
{
    command_t cmd;
    cmd.destination = destination_;
313 314
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
315 316 317
    send_command (cmd);
}

318
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
319 320 321
{
    command_t cmd;
    cmd.destination = destination_;
322
    cmd.type = command_t::term;
323
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
324 325 326
    send_command (cmd);
}

327
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
328 329 330
{
    command_t cmd;
    cmd.destination = destination_;
331
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
332 333 334
    send_command (cmd);
}

335 336 337 338 339 340 341 342 343
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

344 345 346 347 348 349 350 351
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

352 353 354 355 356 357 358 359
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

360 361 362 363 364 365 366 367
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
368
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
369
{
Martin Sustrik's avatar
Martin Sustrik committed
370
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
371 372
}

373
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
374
{
Martin Sustrik's avatar
Martin Sustrik committed
375
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
376 377
}

378
void zmq::object_t::process_own (own_t *)
379 380 381 382
{
    zmq_assert (false);
}

383
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
384
{
Martin Sustrik's avatar
Martin Sustrik committed
385
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
386 387
}

388
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
389 390 391 392
{
    zmq_assert (false);
}

393
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
394
{
Martin Sustrik's avatar
Martin Sustrik committed
395
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
396 397
}

398
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
399 400 401 402
{
    zmq_assert (false);
}

403
void zmq::object_t::process_hiccup (void *)
404 405 406 407
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
408 409 410 411 412 413 414 415 416 417
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

418 419 420 421 422
void zmq::object_t::process_pipe_hwm (int, int)
{
    zmq_assert (false);
}

423
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
424
{
Martin Sustrik's avatar
Martin Sustrik committed
425
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
426 427
}

428
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
429
{
Martin Sustrik's avatar
Martin Sustrik committed
430
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
431 432
}

433
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
434
{
Martin Sustrik's avatar
Martin Sustrik committed
435
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
436 437
}

438
void zmq::object_t::process_reap (class socket_base_t *)
439 440 441 442
{
    zmq_assert (false);
}

443 444 445 446 447
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

448 449 450 451 452
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
453
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
454
{
Martin Sustrik's avatar
Martin Sustrik committed
455
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
456 457
}