object.cpp 9.54 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <string.h>
31
#include <stdarg.h>
32

Martin Sustrik's avatar
Martin Sustrik committed
33
#include "object.hpp"
34
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "pipe.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37
#include "io_thread.hpp"
38
#include "session_base.hpp"
39
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40

Martin Sustrik's avatar
Martin Sustrik committed
41
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
42
    ctx (ctx_),
Martin Sustrik's avatar
Martin Sustrik committed
43
    tid (tid_)
Martin Sustrik's avatar
Martin Sustrik committed
44 45 46
{
}

Martin Sustrik's avatar
Martin Sustrik committed
47
zmq::object_t::object_t (object_t *parent_) :
48
    ctx (parent_->ctx),
Martin Sustrik's avatar
Martin Sustrik committed
49
    tid (parent_->tid)
Martin Sustrik's avatar
Martin Sustrik committed
50 51 52
{
}

Martin Sustrik's avatar
Martin Sustrik committed
53
zmq::object_t::~object_t ()
Martin Sustrik's avatar
Martin Sustrik committed
54 55 56
{
}

Martin Sustrik's avatar
Martin Sustrik committed
57
uint32_t zmq::object_t::get_tid ()
Martin Sustrik's avatar
Martin Sustrik committed
58
{
Martin Sustrik's avatar
Martin Sustrik committed
59
    return tid;
Martin Sustrik's avatar
Martin Sustrik committed
60 61
}

62 63 64 65 66
void zmq::object_t::set_tid(uint32_t id)
{
    tid = id;
}

67
zmq::ctx_t *zmq::object_t::get_ctx ()
68
{
69
    return ctx;
70 71
}

Martin Sustrik's avatar
Martin Sustrik committed
72
void zmq::object_t::process_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
73 74 75
{
    switch (cmd_.type) {

76 77
    case command_t::activate_read:
        process_activate_read ();
78 79
        break;

80 81
    case command_t::activate_write:
        process_activate_write (cmd_.args.activate_write.msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
82 83
        break;

84 85
    case command_t::stop:
        process_stop ();
Martin Sustrik's avatar
Martin Sustrik committed
86 87
        break;

88 89
    case command_t::plug:
        process_plug ();
90
        process_seqnum ();
91
        break;
Martin Sustrik's avatar
Martin Sustrik committed
92

93 94
    case command_t::own:
        process_own (cmd_.args.own.object);
95
        process_seqnum ();
96
        break;
Martin Sustrik's avatar
Martin Sustrik committed
97

98
    case command_t::attach:
99
        process_attach (cmd_.args.attach.engine);
100
        process_seqnum ();
101
        break;
102

Martin Sustrik's avatar
Martin Sustrik committed
103
    case command_t::bind:
104
        process_bind (cmd_.args.bind.pipe);
105
        process_seqnum ();
106
        break;
Martin Sustrik's avatar
Martin Sustrik committed
107

108 109 110 111
    case command_t::hiccup:
        process_hiccup (cmd_.args.hiccup.pipe);
        break;

Martin Sustrik's avatar
Martin Sustrik committed
112 113
    case command_t::pipe_term:
        process_pipe_term ();
114
        break;
Martin Sustrik's avatar
Martin Sustrik committed
115 116 117

    case command_t::pipe_term_ack:
        process_pipe_term_ack ();
118
        break;
Martin Sustrik's avatar
Martin Sustrik committed
119

120 121
    case command_t::term_req:
        process_term_req (cmd_.args.term_req.object);
122
        break;
123 124
    
    case command_t::term:
125
        process_term (cmd_.args.term.linger);
126
        break;
Martin Sustrik's avatar
Martin Sustrik committed
127

128 129
    case command_t::term_ack:
        process_term_ack ();
130
        break;
Martin Sustrik's avatar
Martin Sustrik committed
131

132 133 134 135
    case command_t::reap:
        process_reap (cmd_.args.reap.socket);
        break;

136 137 138 139
    case command_t::reaped:
        process_reaped ();
        break;

140 141 142 143
    case command_t::inproc_connected:
        process_seqnum ();
        break;

144
    case command_t::done:
Martin Sustrik's avatar
Martin Sustrik committed
145
    default:
Martin Sustrik's avatar
Martin Sustrik committed
146
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
147 148 149
    }
}

150 151
int zmq::object_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
152
{
153
    return ctx->register_endpoint (addr_, endpoint_);
154 155
}

Martin Hurton's avatar
Martin Hurton committed
156 157 158 159 160 161
int zmq::object_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    return ctx->unregister_endpoint (addr_, socket_);
}

162 163
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
164
    return ctx->unregister_endpoints (socket_);
165 166
}

167
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
168
{
169
    return ctx->find_endpoint (addr_);
170 171
}

Martin Hurton's avatar
Martin Hurton committed
172 173
void zmq::object_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
174
{
Martin Hurton's avatar
Martin Hurton committed
175
    ctx->pend_connection (addr_, endpoint_, pipes_);
176 177
}

178
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
179
{
180
    return ctx->connect_pending(addr_, bind_socket_);
181 182
}

183 184 185 186 187
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
    ctx->destroy_socket (socket_);
}

188
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
189
{
190
    return ctx->choose_io_thread (affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
191 192
}

Martin Sustrik's avatar
Martin Sustrik committed
193
void zmq::object_t::send_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
194
{
195 196
    //  'stop' command goes always from administrative thread to
    //  the current object. 
Martin Sustrik's avatar
Martin Sustrik committed
197 198 199
    command_t cmd;
    cmd.destination = this;
    cmd.type = command_t::stop;
Martin Sustrik's avatar
Martin Sustrik committed
200
    ctx->send_command (tid, cmd);
Martin Sustrik's avatar
Martin Sustrik committed
201 202
}

203
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
204
{
205 206
    if (inc_seqnum_)
        destination_->inc_seqnum ();
207

Martin Sustrik's avatar
Martin Sustrik committed
208 209
    command_t cmd;
    cmd.destination = destination_;
210
    cmd.type = command_t::plug;
Martin Sustrik's avatar
Martin Sustrik committed
211 212 213
    send_command (cmd);
}

214
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
215
{
216
    destination_->inc_seqnum ();
Martin Sustrik's avatar
Martin Sustrik committed
217 218
    command_t cmd;
    cmd.destination = destination_;
219 220
    cmd.type = command_t::own;
    cmd.args.own.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
221 222 223
    send_command (cmd);
}

224 225
void zmq::object_t::send_attach (session_base_t *destination_,
    i_engine *engine_, bool inc_seqnum_)
226
{
227 228 229
    if (inc_seqnum_)
        destination_->inc_seqnum ();

230 231 232 233 234 235 236
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::attach;
    cmd.args.attach.engine = engine_;
    send_command (cmd);
}

237
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
238
    bool inc_seqnum_)
Martin Sustrik's avatar
Martin Sustrik committed
239
{
240 241 242
    if (inc_seqnum_)
        destination_->inc_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
243 244
    command_t cmd;
    cmd.destination = destination_;
245
    cmd.type = command_t::bind;
246
    cmd.args.bind.pipe = pipe_;
Martin Sustrik's avatar
Martin Sustrik committed
247 248 249
    send_command (cmd);
}

250
void zmq::object_t::send_activate_read (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
251 252 253
{
    command_t cmd;
    cmd.destination = destination_;
254
    cmd.type = command_t::activate_read;
Martin Sustrik's avatar
Martin Sustrik committed
255 256 257
    send_command (cmd);
}

258
void zmq::object_t::send_activate_write (pipe_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
259 260 261 262
    uint64_t msgs_read_)
{
    command_t cmd;
    cmd.destination = destination_;
263 264
    cmd.type = command_t::activate_write;
    cmd.args.activate_write.msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
265 266 267
    send_command (cmd);
}

268 269 270 271 272 273 274 275 276
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::hiccup;
    cmd.args.hiccup.pipe = pipe_;
    send_command (cmd);
}

277
void zmq::object_t::send_pipe_term (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
278 279 280 281 282 283 284
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term;
    send_command (cmd);
}

285
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
286 287 288 289 290 291 292
{
    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::pipe_term_ack;
    send_command (cmd);
}

293 294
void zmq::object_t::send_term_req (own_t *destination_,
    own_t *object_)
Martin Sustrik's avatar
Martin Sustrik committed
295 296 297
{
    command_t cmd;
    cmd.destination = destination_;
298 299
    cmd.type = command_t::term_req;
    cmd.args.term_req.object = object_;
Martin Sustrik's avatar
Martin Sustrik committed
300 301 302
    send_command (cmd);
}

303
void zmq::object_t::send_term (own_t *destination_, int linger_)
Martin Sustrik's avatar
Martin Sustrik committed
304 305 306
{
    command_t cmd;
    cmd.destination = destination_;
307
    cmd.type = command_t::term;
308
    cmd.args.term.linger = linger_;
Martin Sustrik's avatar
Martin Sustrik committed
309 310 311
    send_command (cmd);
}

312
void zmq::object_t::send_term_ack (own_t *destination_)
Martin Sustrik's avatar
Martin Sustrik committed
313 314 315
{
    command_t cmd;
    cmd.destination = destination_;
316
    cmd.type = command_t::term_ack;
Martin Sustrik's avatar
Martin Sustrik committed
317 318 319
    send_command (cmd);
}

320 321 322 323 324 325 326 327 328
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reap;
    cmd.args.reap.socket = socket_;
    send_command (cmd);
}

329 330 331 332 333 334 335 336
void zmq::object_t::send_reaped ()
{
    command_t cmd;
    cmd.destination = ctx->get_reaper ();
    cmd.type = command_t::reaped;
    send_command (cmd);
}

337 338 339 340 341 342 343 344
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
{
    command_t cmd;
    cmd.destination = socket_;
    cmd.type = command_t::inproc_connected;
    send_command (cmd);
}

345 346 347 348 349 350 351 352
void zmq::object_t::send_done ()
{
    command_t cmd;
    cmd.destination = NULL;
    cmd.type = command_t::done;
    ctx->send_command (ctx_t::term_tid, cmd);
}

Martin Sustrik's avatar
Martin Sustrik committed
353
void zmq::object_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
354
{
Martin Sustrik's avatar
Martin Sustrik committed
355
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
356 357
}

358
void zmq::object_t::process_plug ()
Martin Sustrik's avatar
Martin Sustrik committed
359
{
Martin Sustrik's avatar
Martin Sustrik committed
360
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
361 362
}

363
void zmq::object_t::process_own (own_t *)
364 365 366 367
{
    zmq_assert (false);
}

368
void zmq::object_t::process_attach (i_engine *)
Martin Sustrik's avatar
Martin Sustrik committed
369
{
Martin Sustrik's avatar
Martin Sustrik committed
370
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
371 372
}

373
void zmq::object_t::process_bind (pipe_t *)
Martin Sustrik's avatar
Martin Sustrik committed
374 375 376 377
{
    zmq_assert (false);
}

378
void zmq::object_t::process_activate_read ()
Martin Sustrik's avatar
Martin Sustrik committed
379
{
Martin Sustrik's avatar
Martin Sustrik committed
380
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
381 382
}

383
void zmq::object_t::process_activate_write (uint64_t)
Martin Hurton's avatar
Martin Hurton committed
384 385 386 387
{
    zmq_assert (false);
}

388
void zmq::object_t::process_hiccup (void *)
389 390 391 392
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
393 394 395 396 397 398 399 400 401 402
void zmq::object_t::process_pipe_term ()
{
    zmq_assert (false);
}

void zmq::object_t::process_pipe_term_ack ()
{
    zmq_assert (false);
}

403
void zmq::object_t::process_term_req (own_t *)
Martin Sustrik's avatar
Martin Sustrik committed
404
{
Martin Sustrik's avatar
Martin Sustrik committed
405
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
406 407
}

408
void zmq::object_t::process_term (int)
Martin Sustrik's avatar
Martin Sustrik committed
409
{
Martin Sustrik's avatar
Martin Sustrik committed
410
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
411 412
}

413
void zmq::object_t::process_term_ack ()
Martin Sustrik's avatar
Martin Sustrik committed
414
{
Martin Sustrik's avatar
Martin Sustrik committed
415
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
416 417
}

418
void zmq::object_t::process_reap (class socket_base_t *)
419 420 421 422
{
    zmq_assert (false);
}

423 424 425 426 427
void zmq::object_t::process_reaped ()
{
    zmq_assert (false);
}

428 429 430 431 432
void zmq::object_t::process_seqnum ()
{
    zmq_assert (false);
}

Martin Sustrik's avatar
Martin Sustrik committed
433
void zmq::object_t::send_command (command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
434
{
Martin Sustrik's avatar
Martin Sustrik committed
435
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
Martin Sustrik's avatar
Martin Sustrik committed
436 437
}