object.cpp 11.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <string.h>
32
#include <stdarg.h>
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include "object.hpp"
35
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
40
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41

42
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : _ctx (ctx_), _tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
43 44 45
{
}

Martin Sustrik's avatar
Martin Sustrik committed
46
zmq::object_t::object_t (object_t *parent_) :
47 48
    _ctx (parent_->_ctx),
    _tid (parent_->_tid)
Martin Sustrik's avatar
Martin Sustrik committed
49 50 51
{
}

Martin Sustrik's avatar
Martin Sustrik committed
52
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
53 54 55
{
}

Martin Sustrik's avatar
Martin Sustrik committed
56
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
57
{
58
    return _tid;
Martin Sustrik's avatar
Martin Sustrik committed
59 60
}

61
void zmq::object_t::set_tid (uint32_t id_)
62
{
63
    _tid = id_;
64 65
}

66
zmq::ctx_t *zmq::object_t::get_ctx ()
67
{
68
    return _ctx;
69 70
}

Martin Sustrik's avatar
Martin Sustrik committed
71
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
72 73
{
    switch (cmd_.type) {
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
        case command_t::activate_read:
            process_activate_read ();
            break;

        case command_t::activate_write:
            process_activate_write (cmd_.args.activate_write.msgs_read);
            break;

        case command_t::stop:
            process_stop ();
            break;

        case command_t::plug:
            process_plug ();
            process_seqnum ();
            break;

        case command_t::own:
            process_own (cmd_.args.own.object);
            process_seqnum ();
            break;

        case command_t::attach:
            process_attach (cmd_.args.attach.engine);
            process_seqnum ();
            break;

        case command_t::bind:
            process_bind (cmd_.args.bind.pipe);
            process_seqnum ();
            break;

        case command_t::hiccup:
            process_hiccup (cmd_.args.hiccup.pipe);
            break;

        case command_t::pipe_term:
            process_pipe_term ();
            break;

        case command_t::pipe_term_ack:
            process_pipe_term_ack ();
            break;

        case command_t::pipe_hwm:
            process_pipe_hwm (cmd_.args.pipe_hwm.inhwm,
                              cmd_.args.pipe_hwm.outhwm);
            break;

        case command_t::term_req:
            process_term_req (cmd_.args.term_req.object);
            break;

        case command_t::term:
            process_term (cmd_.args.term.linger);
            break;

        case command_t::term_ack:
            process_term_ack ();
            break;

        case command_t::term_endpoint:
            process_term_endpoint (cmd_.args.term_endpoint.endpoint);
            break;

        case command_t::reap:
            process_reap (cmd_.args.reap.socket);
            break;

        case command_t::reaped:
            process_reaped ();
            break;

        case command_t::inproc_connected:
            process_seqnum ();
            break;

        case command_t::done:
        default:
            zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
154 155 156
    }
}

157
int zmq::object_t::register_endpoint (const char *addr_,
158
                                      const endpoint_t &endpoint_)
159
{
160
    return _ctx->register_endpoint (addr_, endpoint_);
161 162
}

163 164
int zmq::object_t::unregister_endpoint (const std::string &addr_,
                                        socket_base_t *socket_)
Martin Hurton's avatar
Martin Hurton committed
165
{
166
    return _ctx->unregister_endpoint (addr_, socket_);
Martin Hurton's avatar
Martin Hurton committed
167 168
}

169 170
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
171
    return _ctx->unregister_endpoints (socket_);
172 173
}

174
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
175
{
176
    return _ctx->find_endpoint (addr_);
177 178
}

Martin Hurton's avatar
Martin Hurton committed
179
void zmq::object_t::pend_connection (const std::string &addr_,
180 181
                                     const endpoint_t &endpoint_,
                                     pipe_t **pipes_)
182
{
183
    _ctx->pend_connection (addr_, endpoint_, pipes_);
184 185
}

186 187
void zmq::object_t::connect_pending (const char *addr_,
                                     zmq::socket_base_t *bind_socket_)
188
{
189
    return _ctx->connect_pending (addr_, bind_socket_);
190 191
}

192 193
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
194
    _ctx->destroy_socket (socket_);
195 196
}

197
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
198
{
199
    return _ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
200 201
}

Martin Sustrik's avatar
Martin Sustrik committed
202
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
203
{
204
    //  'stop' command goes always from administrative thread to
205
    //  the current object.
Martin Sustrik's avatar
Martin Sustrik committed
206 207 208
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
209
    _ctx->send_command (_tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
210 211
}

212
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
213
{
214 215
    if (inc_seqnum_)
        destination_->inc_seqnum ();
216

Martin Sustrik's avatar
Martin Sustrik committed
217 218
    command_t cmd;
    cmd.destination = destination_;
219
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
220 221 222
    send_command (cmd);
}

223
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
224
{
225
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
226 227
    command_t cmd;
    cmd.destination = destination_;
228 229
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
230 231 232
    send_command (cmd);
}

233
void zmq::object_t::send_attach (session_base_t *destination_,
234 235
                                 i_engine *engine_,
                                 bool inc_seqnum_)
236
{
237 238 239
    if (inc_seqnum_)
        destination_->inc_seqnum ();

240 241 242 243 244 245 246
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

247 248 249
void zmq::object_t::send_bind (own_t *destination_,
                               pipe_t *pipe_,
                               bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
250
{
251 252 253
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
254 255
    command_t cmd;
    cmd.destination = destination_;
256
    cmd.type = command_t::bind;
257
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
258 259 260
    send_command (cmd);
}

261
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
262 263 264
{
    command_t cmd;
    cmd.destination = destination_;
265
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
266 267 268
    send_command (cmd);
}

269
void zmq::object_t::send_activate_write (pipe_t *destination_,
270
                                         uint64_t msgs_read_)
Martin Hurton's avatar
Martin Hurton committed
271 272 273
{
    command_t cmd;
    cmd.destination = destination_;
274 275
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
276 277 278
    send_command (cmd);
}

279 280 281 282 283 284 285 286 287
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

288
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
289 290 291 292 293 294 295
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

296
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
297 298 299 300 301 302 303
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

304 305 306
void zmq::object_t::send_pipe_hwm (pipe_t *destination_,
                                   int inhwm_,
                                   int outhwm_)
307 308 309 310 311 312 313 314 315
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_hwm;
    cmd.args.pipe_hwm.inhwm = inhwm_;
    cmd.args.pipe_hwm.outhwm = outhwm_;
    send_command (cmd);
}

316
void zmq::object_t::send_term_req (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
317 318 319
{
    command_t cmd;
    cmd.destination = destination_;
320 321
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
322 323 324
    send_command (cmd);
}

325
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
326 327 328
{
    command_t cmd;
    cmd.destination = destination_;
329
    cmd.type = command_t::term;
330
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
331 332 333
    send_command (cmd);
}

334
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
335 336 337
{
    command_t cmd;
    cmd.destination = destination_;
338
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
339 340 341
    send_command (cmd);
}

342
void zmq::object_t::send_term_endpoint (own_t *destination_,
343
                                        std::string *endpoint_)
344 345 346 347 348 349 350 351
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::term_endpoint;
    cmd.args.term_endpoint.endpoint = endpoint_;
    send_command (cmd);
}

352 353 354
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
355
    cmd.destination = _ctx->get_reaper ();
356 357 358 359 360
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

361 362 363
void zmq::object_t::send_reaped ()
{
    command_t cmd;
364
    cmd.destination = _ctx->get_reaper ();
365 366 367 368
    cmd.type = command_t::reaped;
    send_command (cmd);
}

369 370 371 372 373 374 375 376
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

377 378 379 380 381
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
382
    _ctx->send_command (ctx_t::term_tid, cmd);
383 384
}

Martin Sustrik's avatar
Martin Sustrik committed
385
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
386
{
Martin Sustrik's avatar
Martin Sustrik committed
387
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
388 389
}

390
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
391
{
Martin Sustrik's avatar
Martin Sustrik committed
392
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
393 394
}

395
void zmq::object_t::process_own (own_t *)
396 397 398 399
{
    zmq_assert (false);
}

400
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
401
{
Martin Sustrik's avatar
Martin Sustrik committed
402
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
403 404
}

405
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
406 407 408 409
{
    zmq_assert (false);
}

410
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
411
{
Martin Sustrik's avatar
Martin Sustrik committed
412
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
413 414
}

415
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
416 417 418 419
{
    zmq_assert (false);
}

420
void zmq::object_t::process_hiccup (void *)
421 422 423 424
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
425 426 427 428 429 430 431 432 433 434
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

435 436 437 438 439
void zmq::object_t::process_pipe_hwm (int, int)
{
    zmq_assert (false);
}

440
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
441
{
Martin Sustrik's avatar
Martin Sustrik committed
442
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
443 444
}

445
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
446
{
Martin Sustrik's avatar
Martin Sustrik committed
447
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
448 449
}

450
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
451
{
Martin Sustrik's avatar
Martin Sustrik committed
452
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
453 454
}

455 456 457 458 459
void zmq::object_t::process_term_endpoint (std::string *)
{
    zmq_assert (false);
}

460
void zmq::object_t::process_reap (class socket_base_t *)
461 462 463 464
{
    zmq_assert (false);
}

465 466 467 468 469
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

470 471 472 473 474
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
475
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
476
{
477
    _ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
478
}