router.cpp 16 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "router.hpp"
33
#include "pipe.hpp"
34 35
#include "wire.hpp"
#include "random.hpp"
36
#include "likely.hpp"
37
#include "err.hpp"
38

39
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40
    routing_socket_base_t (parent_, tid_, sid_),
41 42 43 44 45 46 47 48 49
    _prefetched (false),
    _routing_id_sent (false),
    _current_in (NULL),
    _terminate_current_in (false),
    _more_in (false),
    _current_out (NULL),
    _more_out (false),
    _next_integral_routing_id (generate_random ()),
    _mandatory (false),
50
    //  raw_socket functionality in ROUTER is deprecated
51 52 53
    _raw_socket (false),
    _probe_router (false),
    _handover (false)
54
{
55
    options.type = ZMQ_ROUTER;
56
    options.recv_routing_id = true;
57
    options.raw_socket = false;
58

59 60
    _prefetched_id.init ();
    _prefetched_msg.init ();
61 62
}

63
zmq::router_t::~router_t ()
64
{
65 66 67
    zmq_assert (_anonymous_pipes.empty ());
    _prefetched_id.close ();
    _prefetched_msg.close ();
68 69
}

70 71 72
void zmq::router_t::xattach_pipe (pipe_t *pipe_,
                                  bool subscribe_to_all_,
                                  bool locally_initiated_)
73
{
74
    LIBZMQ_UNUSED (subscribe_to_all_);
75

76 77
    zmq_assert (pipe_);

78
    if (_probe_router) {
79 80
        msg_t probe_msg;
        int rc = probe_msg.init ();
81 82
        errno_assert (rc == 0);

83
        rc = pipe_->write (&probe_msg);
84
        // zmq_assert (rc) is not applicable here, since it is not a bug.
85 86
        LIBZMQ_UNUSED (rc);

87 88
        pipe_->flush ();

89
        rc = probe_msg.close ();
90 91 92
        errno_assert (rc == 0);
    }

93
    bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
94
    if (routing_id_ok)
95
        _fq.attach (pipe_);
96
    else
97
        _anonymous_pipes.insert (pipe_);
98 99
}

100 101 102
int zmq::router_t::xsetsockopt (int option_,
                                const void *optval_,
                                size_t optvallen_)
103
{
104
    const bool is_int = (optvallen_ == sizeof (int));
105
    int value = 0;
106 107
    if (is_int)
        memcpy (&value, optval_, sizeof (int));
108 109 110 111

    switch (option_) {
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
112 113
                _raw_socket = (value != 0);
                if (_raw_socket) {
114
                    options.recv_routing_id = false;
115
                    options.raw_socket = true;
116 117 118 119
                }
                return 0;
            }
            break;
120

121 122
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
123
                _mandatory = (value != 0);
124 125 126
                return 0;
            }
            break;
127

128
        case ZMQ_PROBE_ROUTER:
129
            if (is_int && value >= 0) {
130
                _probe_router = (value != 0);
131 132 133
                return 0;
            }
            break;
134 135

        case ZMQ_ROUTER_HANDOVER:
136
            if (is_int && value >= 0) {
137
                _handover = (value != 0);
138 139 140
                return 0;
            }
            break;
141

142 143 144 145 146 147 148 149 150 151
#ifdef ZMQ_BUILD_DRAFT_API
        case ZMQ_ROUTER_NOTIFY:
            if (is_int && value >= 0
                && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) {
                options.router_notify = value;
                return 0;
            }
            break;
#endif

152
        default:
153 154
            return routing_socket_base_t::xsetsockopt (option_, optval_,
                                                       optvallen_);
155
    }
156 157
    errno = EINVAL;
    return -1;
158 159
}

160

161
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
162
{
163
    if (0 == _anonymous_pipes.erase (pipe_)) {
164
        erase_out_pipe (pipe_);
165
        _fq.pipe_terminated (pipe_);
166
        pipe_->rollback ();
167 168
        if (pipe_ == _current_out)
            _current_out = NULL;
169
    }
170 171
}

172
void zmq::router_t::xread_activated (pipe_t *pipe_)
173
{
174 175 176
    std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
    if (it == _anonymous_pipes.end ())
        _fq.activated (pipe_);
177
    else {
178
        const bool routing_id_ok = identify_peer (pipe_, false);
179
        if (routing_id_ok) {
180 181
            _anonymous_pipes.erase (it);
            _fq.attach (pipe_);
182
        }
183
    }
184 185
}

186
int zmq::router_t::xsend (msg_t *msg_)
187
{
188
    //  If this is the first part of the message it's the ID of the
189
    //  peer to send the message to.
190 191
    if (!_more_out) {
        zmq_assert (!_current_out);
192

193
        //  If we have malformed message (prefix with no subsequent message)
194
        //  then just silently ignore it.
195
        //  TODO: The connections should be killed instead.
196
        if (msg_->flags () & msg_t::more) {
197
            _more_out = true;
198

199
            //  Find the pipe associated with the routing id stored in the prefix.
200
            //  If there's no such pipe just silently ignore the message, unless
201
            //  router_mandatory is set.
202 203 204
            out_pipe_t *out_pipe = lookup_out_pipe (
              blob_t (static_cast<unsigned char *> (msg_->data ()),
                      msg_->size (), zmq::reference_tag_t ()));
205

206 207
            if (out_pipe) {
                _current_out = out_pipe->pipe;
208 209

                // Check whether pipe is closed or not
210
                if (!_current_out->check_write ()) {
211
                    // Check whether pipe is full or not
212
                    bool pipe_full = !_current_out->check_hwm ();
213
                    out_pipe->active = false;
214
                    _current_out = NULL;
215

216 217
                    if (_mandatory) {
                        _more_out = false;
218 219 220 221
                        if (pipe_full)
                            errno = EAGAIN;
                        else
                            errno = EHOSTUNREACH;
222 223 224
                        return -1;
                    }
                }
225 226
            } else if (_mandatory) {
                _more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
227
                errno = EHOSTUNREACH;
228
                return -1;
229
            }
230
        }
231

232 233 234 235
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
236
        return 0;
237 238
    }

Martin Hurton's avatar
Martin Hurton committed
239
    //  Ignore the MORE flag for raw-sock or assert?
240
    if (options.raw_socket)
Martin Hurton's avatar
Martin Hurton committed
241
        msg_->reset_flags (msg_t::more);
242

243
    //  Check whether this is the last part of the message.
244
    _more_out = (msg_->flags () & msg_t::more) != 0;
245

246
    //  Push the message into the pipe. If there's no out pipe, just drop it.
247
    if (_current_out) {
248 249 250
        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
251 252
        if (_raw_socket && msg_->size () == 0) {
            _current_out->terminate (false);
253 254
            int rc = msg_->close ();
            errno_assert (rc == 0);
255 256
            rc = msg_->init ();
            errno_assert (rc == 0);
257
            _current_out = NULL;
258
            return 0;
Martin Hurton's avatar
Martin Hurton committed
259
        }
260

261
        bool ok = _current_out->write (msg_);
262 263 264 265
        if (unlikely (!ok)) {
            // Message failed to send - we must close it ourselves.
            int rc = msg_->close ();
            errno_assert (rc == 0);
266 267
            // HWM was checked before, so the pipe must be gone. Roll back
            // messages that were piped, for example REP labels.
268 269
            _current_out->rollback ();
            _current_out = NULL;
270
        } else {
271 272 273
            if (!_more_out) {
                _current_out->flush ();
                _current_out = NULL;
274
            }
275
        }
276
    } else {
277 278
        int rc = msg_->close ();
        errno_assert (rc == 0);
279
    }
280 281

    //  Detach the message from the data buffer.
282 283
    int rc = msg_->init ();
    errno_assert (rc == 0);
284 285

    return 0;
286 287
}

288
int zmq::router_t::xrecv (msg_t *msg_)
289
{
290 291 292
    if (_prefetched) {
        if (!_routing_id_sent) {
            int rc = msg_->move (_prefetched_id);
293
            errno_assert (rc == 0);
294
            _routing_id_sent = true;
295
        } else {
296
            int rc = msg_->move (_prefetched_msg);
297
            errno_assert (rc == 0);
298
            _prefetched = false;
299
        }
300
        _more_in = (msg_->flags () & msg_t::more) != 0;
301

302 303 304 305
        if (!_more_in) {
            if (_terminate_current_in) {
                _current_in->terminate (true);
                _terminate_current_in = false;
306
            }
307
            _current_in = NULL;
308
        }
309 310 311
        return 0;
    }

312
    pipe_t *pipe = NULL;
313
    int rc = _fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
314

315
    //  It's possible that we receive peer's routing id. That happens
Martin Hurton's avatar
Martin Hurton committed
316
    //  after reconnection. The current implementation assumes that
317 318
    //  the peer always uses the same routing id.
    while (rc == 0 && msg_->is_routing_id ())
319
        rc = _fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
320 321

    if (rc != 0)
322
        return -1;
323

324
    zmq_assert (pipe != NULL);
325

326
    //  If we are in the middle of reading a message, just return the next part.
327 328
    if (_more_in) {
        _more_in = (msg_->flags () & msg_t::more) != 0;
329

330 331 332 333
        if (!_more_in) {
            if (_terminate_current_in) {
                _current_in->terminate (true);
                _terminate_current_in = false;
334
            }
335
            _current_in = NULL;
336
        }
337
    } else {
338 339 340
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
341
        rc = _prefetched_msg.move (*msg_);
342
        errno_assert (rc == 0);
343 344
        _prefetched = true;
        _current_in = pipe;
345

346
        const blob_t &routing_id = pipe->get_routing_id ();
347
        rc = msg_->init_size (routing_id.size ());
348
        errno_assert (rc == 0);
349
        memcpy (msg_->data (), routing_id.data (), routing_id.size ());
350
        msg_->set_flags (msg_t::more);
351 352 353
        if (_prefetched_msg.metadata ())
            msg_->set_metadata (_prefetched_msg.metadata ());
        _routing_id_sent = true;
354
    }
355

356
    return 0;
357 358
}

359
int zmq::router_t::rollback ()
360
{
361 362 363 364
    if (_current_out) {
        _current_out->rollback ();
        _current_out = NULL;
        _more_out = false;
365 366 367 368
    }
    return 0;
}

369
bool zmq::router_t::xhas_in ()
370
{
371
    //  If we are in the middle of reading the messages, there are
372
    //  definitely more parts available.
373
    if (_more_in)
374 375
        return true;

376
    //  We may already have a message pre-fetched.
377
    if (_prefetched)
378
        return true;
379

380 381 382
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
383
    int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
384

385
    //  It's possible that we receive peer's routing id. That happens
Martin Hurton's avatar
Martin Hurton committed
386
    //  after reconnection. The current implementation assumes that
387 388
    //  the peer always uses the same routing id.
    //  TODO: handle the situation when the peer changes its routing id.
389 390
    while (rc == 0 && _prefetched_msg.is_routing_id ())
        rc = _fq.recvpipe (&_prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
391

392
    if (rc != 0)
393
        return false;
394

Martin Hurton's avatar
Martin Hurton committed
395
    zmq_assert (pipe != NULL);
396

397
    const blob_t &routing_id = pipe->get_routing_id ();
398
    rc = _prefetched_id.init_size (routing_id.size ());
399
    errno_assert (rc == 0);
400 401
    memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ());
    _prefetched_id.set_flags (msg_t::more);
402

403 404 405
    _prefetched = true;
    _routing_id_sent = false;
    _current_in = pipe;
406

407
    return true;
408 409
}

410
static bool check_pipe_hwm (const zmq::pipe_t &pipe_)
411
{
412
    return pipe_.check_hwm ();
413 414
}

415
bool zmq::router_t::xhas_out ()
416
{
417 418 419 420
    //  In theory, ROUTER socket is always ready for writing (except when
    //  MANDATORY is set). Whether actual attempt to write succeeds depends
    //  on whitch pipe the message is going to be routed to.

421
    if (!_mandatory)
422 423
        return true;

424
    return any_of_out_pipes (check_pipe_hwm);
425 426
}

427 428
int zmq::router_t::get_peer_state (const void *routing_id_,
                                   size_t routing_id_size_) const
429 430 431
{
    int res = 0;

432 433 434 435
    // TODO remove the const_cast, see comment in lookup_out_pipe
    const blob_t routing_id_blob (
      static_cast<unsigned char *> (const_cast<void *> (routing_id_)),
      routing_id_size_);
436 437
    const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
    if (!out_pipe) {
438 439 440 441
        errno = EHOSTUNREACH;
        return -1;
    }

442
    if (out_pipe->pipe->check_hwm ())
443 444 445 446 447 448 449
        res |= ZMQ_POLLOUT;

    /** \todo does it make any sense to check the inpipe as well? */

    return res;
}

450
bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
451 452
{
    msg_t msg;
453
    blob_t routing_id;
454

455 456
    if (locally_initiated_ && connect_routing_id_is_set ()) {
        const std::string connect_routing_id = extract_connect_routing_id ();
457 458 459 460
        routing_id.set (
          reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
          connect_routing_id.length ());
        //  Not allowed to duplicate an existing rid
461
        zmq_assert (!has_out_pipe (routing_id));
462 463 464 465 466
    } else if (
      options
        .raw_socket) { //  Always assign an integral routing id for raw-socket
        unsigned char buf[5];
        buf[0] = 0;
467
        put_uint32 (buf + 1, _next_integral_routing_id++);
468
        routing_id.set (buf, sizeof buf);
469
    } else if (!options.raw_socket) {
470
        //  Pick up handshake cases and also case where next integral routing id is set
471
        msg.init ();
472
        bool ok = pipe_->read (&msg);
473
        if (!ok)
474
            return false;
475 476

        if (msg.size () == 0) {
477
            //  Fall back on the auto-generation
478 479
            unsigned char buf[5];
            buf[0] = 0;
480
            put_uint32 (buf + 1, _next_integral_routing_id++);
481
            routing_id.set (buf, sizeof buf);
482
            msg.close ();
483
        } else {
484 485
            routing_id.set (static_cast<unsigned char *> (msg.data ()),
                            msg.size ());
486 487
            msg.close ();

488 489
            //  Try to remove an existing routing id entry to allow the new
            //  connection to take the routing id.
490
            out_pipe_t *existing_outpipe = lookup_out_pipe (routing_id);
491

492
            if (existing_outpipe) {
493
                if (!_handover)
Pieter Hintjens's avatar
Pieter Hintjens committed
494
                    //  Ignore peers with duplicate ID
495
                    return false;
496 497 498 499 500 501

                //  We will allow the new connection to take over this
                //  routing id. Temporarily assign a new routing id to the
                //  existing pipe so we can terminate it asynchronously.
                unsigned char buf[5];
                buf[0] = 0;
502
                put_uint32 (buf + 1, _next_integral_routing_id++);
503 504
                blob_t new_routing_id (buf, sizeof buf);

505
                pipe_t *const old_pipe = existing_outpipe->pipe;
506

507 508 509
                erase_out_pipe (old_pipe);
                old_pipe->set_router_socket_routing_id (new_routing_id);
                add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe);
510

511
                if (old_pipe == _current_in)
512
                    _terminate_current_in = true;
513
                else
514
                    old_pipe->terminate (true);
515
            }
516
        }
517 518
    }

519
    pipe_->set_router_socket_routing_id (routing_id);
520
    add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
521 522 523

    return true;
}