router.cpp 14.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "macros.hpp"
31
#include "router.hpp"
32
#include "pipe.hpp"
33 34
#include "wire.hpp"
#include "random.hpp"
35
#include "likely.hpp"
36
#include "err.hpp"
37

38
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
39
    socket_base_t (parent_, tid_, sid_),
40 41
    prefetched (false),
    identity_sent (false),
42 43
    more_in (false),
    current_out (NULL),
44
    more_out (false),
45
    next_rid (generate_random ()),
46
    mandatory (false),
47
    //  raw_socket functionality in ROUTER is deprecated
48
    raw_socket (false),
49
    probe_router (false),
Pieter Hintjens's avatar
Pieter Hintjens committed
50
    handover (false)
51
{
52
    options.type = ZMQ_ROUTER;
53
    options.recv_identity = true;
54
    options.raw_socket = false;
55

56
    prefetched_id.init ();
57
    prefetched_msg.init ();
58 59
}

60
zmq::router_t::~router_t ()
61
{
62
    zmq_assert (anonymous_pipes.empty ());;
63
    zmq_assert (outpipes.empty ());
64
    prefetched_id.close ();
65
    prefetched_msg.close ();
66 67
}

68
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
69
{
70
    LIBZMQ_UNUSED (subscribe_to_all_);
71

72 73
    zmq_assert (pipe_);

74 75 76 77 78 79 80 81 82 83 84 85 86
    if (probe_router) {
        msg_t probe_msg_;
        int rc = probe_msg_.init ();
        errno_assert (rc == 0);

        rc = pipe_->write (&probe_msg_);
        // zmq_assert (rc) is not applicable here, since it is not a bug.
        pipe_->flush ();

        rc = probe_msg_.close ();
        errno_assert (rc == 0);
    }

87 88 89 90 91
    bool identity_ok = identify_peer (pipe_);
    if (identity_ok)
        fq.attach (pipe_);
    else
        anonymous_pipes.insert (pipe_);
92 93
}

94
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
95 96
    size_t optvallen_)
{
97 98 99 100
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;

    switch (option_) {
101 102 103
        case ZMQ_CONNECT_RID:
            if (optval_ && optvallen_) {
                connect_rid.assign ((char *) optval_, optvallen_);
104 105
                return 0;
            }
106
            break;
107

108 109
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
110 111
                raw_socket = (value != 0);
                if (raw_socket) {
112
                    options.recv_identity = false;
113
                    options.raw_socket = true;
114 115 116 117
                }
                return 0;
            }
            break;
118

119 120
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
121
                mandatory = (value != 0);
122 123 124
                return 0;
            }
            break;
125

126
        case ZMQ_PROBE_ROUTER:
127
            if (is_int && value >= 0) {
128
                probe_router = (value != 0);
129 130 131
                return 0;
            }
            break;
132 133

        case ZMQ_ROUTER_HANDOVER:
134
            if (is_int && value >= 0) {
135
                handover = (value != 0);
136 137 138
                return 0;
            }
            break;
139

140 141
        default:
            break;
142
    }
143 144
    errno = EINVAL;
    return -1;
145 146
}

147

148
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
149
{
150 151 152 153 154 155 156
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it != anonymous_pipes.end ())
        anonymous_pipes.erase (it);
    else {
        outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
        zmq_assert (it != outpipes.end ());
        outpipes.erase (it);
157
        fq.pipe_terminated (pipe_);
158 159
        if (pipe_ == current_out)
            current_out = NULL;
160
    }
161 162
}

163
void zmq::router_t::xread_activated (pipe_t *pipe_)
164
{
165 166 167 168 169
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it == anonymous_pipes.end ())
        fq.activated (pipe_);
    else {
        bool identity_ok = identify_peer (pipe_);
170
        if (identity_ok) {
171
            anonymous_pipes.erase (it);
172 173
            fq.attach (pipe_);
        }
174
    }
175 176
}

177
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
178
{
179 180 181 182 183 184 185 186
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
Martin Hurton's avatar
Martin Hurton committed
187 188
}

189
int zmq::router_t::xsend (msg_t *msg_)
190
{
191
    //  If this is the first part of the message it's the ID of the
192 193 194 195
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

196
        //  If we have malformed message (prefix with no subsequent message)
197
        //  then just silently ignore it.
198
        //  TODO: The connections should be killed instead.
199
        if (msg_->flags () & msg_t::more) {
200 201 202

            more_out = true;

203
            //  Find the pipe associated with the identity stored in the prefix.
204
            //  If there's no such pipe just silently ignore the message, unless
205
            //  router_mandatory is set.
206 207 208 209 210
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
211
                if (!current_out->check_write ()) {
212 213
                    it->second.active = false;
                    current_out = NULL;
214 215 216 217 218
                    if (mandatory) {
                        more_out = false;
                        errno = EAGAIN;
                        return -1;
                    }
219
                }
Martin Hurton's avatar
Martin Hurton committed
220 221
            }
            else
222
            if (mandatory) {
223
                more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
224
                errno = EHOSTUNREACH;
225
                return -1;
226
            }
227
        }
228

229 230 231 232
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
233
        return 0;
234 235
    }

Martin Hurton's avatar
Martin Hurton committed
236
    //  Ignore the MORE flag for raw-sock or assert?
237
    if (options.raw_socket)
Martin Hurton's avatar
Martin Hurton committed
238
        msg_->reset_flags (msg_t::more);
239

240
    //  Check whether this is the last part of the message.
241
    more_out = msg_->flags () & msg_t::more ? true : false;
242

243 244
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
245 246 247 248

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
249
        if (raw_socket && msg_->size() == 0) {
Martin Hurton's avatar
Martin Hurton committed
250
            current_out->terminate (false);
251 252
            int rc = msg_->close ();
            errno_assert (rc == 0);
253 254
            rc = msg_->init ();
            errno_assert (rc == 0);
255 256
            current_out = NULL;
            return 0;
Martin Hurton's avatar
Martin Hurton committed
257
        }
258

259
        bool ok = current_out->write (msg_);
260 261 262 263
        if (unlikely (!ok)) {
            // Message failed to send - we must close it ourselves.
            int rc = msg_->close ();
            errno_assert (rc == 0);
264
            current_out = NULL;
265 266 267 268 269
        } else {
          if (!more_out) {
              current_out->flush ();
              current_out = NULL;
          }
270 271 272
        }
    }
    else {
273 274
        int rc = msg_->close ();
        errno_assert (rc == 0);
275
    }
276 277

    //  Detach the message from the data buffer.
278 279
    int rc = msg_->init ();
    errno_assert (rc == 0);
280 281

    return 0;
282 283
}

284
int zmq::router_t::xrecv (msg_t *msg_)
285
{
286 287 288 289 290 291 292 293 294 295 296
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
297
        more_in = msg_->flags () & msg_t::more ? true : false;
298 299 300
        return 0;
    }

301
    pipe_t *pipe = NULL;
302
    int rc = fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
303 304 305 306 307 308 309 310

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    while (rc == 0 && msg_->is_identity ())
        rc = fq.recvpipe (msg_, &pipe);

    if (rc != 0)
311
        return -1;
312

313
    zmq_assert (pipe != NULL);
314

315
    //  If we are in the middle of reading a message, just return the next part.
316
    if (more_in)
317
        more_in = msg_->flags () & msg_t::more ? true : false;
318 319 320 321 322 323 324 325 326 327 328 329 330
    else {
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
        rc = prefetched_msg.move (*msg_);
        errno_assert (rc == 0);
        prefetched = true;

        blob_t identity = pipe->get_identity ();
        rc = msg_->init_size (identity.size ());
        errno_assert (rc == 0);
        memcpy (msg_->data (), identity.data (), identity.size ());
        msg_->set_flags (msg_t::more);
331 332
        if (prefetched_msg.metadata())
            msg_->set_metadata(prefetched_msg.metadata());
333
        identity_sent = true;
334
    }
335

336
    return 0;
337 338
}

339
int zmq::router_t::rollback (void)
340 341 342 343 344 345 346 347 348
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

349
bool zmq::router_t::xhas_in ()
350
{
351
    //  If we are in the middle of reading the messages, there are
352 353 354 355
    //  definitely more parts available.
    if (more_in)
        return true;

356
    //  We may already have a message pre-fetched.
357
    if (prefetched)
358
        return true;
359

360 361 362
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
363
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
364 365 366 367 368 369 370 371

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    //  TODO: handle the situation when the peer changes its identity.
    while (rc == 0 && prefetched_msg.is_identity ())
        rc = fq.recvpipe (&prefetched_msg, &pipe);

372
    if (rc != 0)
373
        return false;
374

Martin Hurton's avatar
Martin Hurton committed
375
    zmq_assert (pipe != NULL);
376 377 378 379 380 381 382 383 384

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;
385

386
    return true;
387 388
}

389
bool zmq::router_t::xhas_out ()
390
{
391
    //  In theory, ROUTER socket is always ready for writing. Whether actual
392
    //  attempt to write succeeds depends on which pipe the message is going
393 394
    //  to be routed to.
    return true;
395 396
}

397 398 399 400 401
zmq::blob_t zmq::router_t::get_credential () const
{
    return fq.get_credential ();
}

402 403 404 405
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
    msg_t msg;
    blob_t identity;
406
    bool ok;
407

408 409 410 411
    if (connect_rid.length()) {
        identity = blob_t ((unsigned char*) connect_rid.c_str (),
            connect_rid.length());
        connect_rid.clear ();
412
        outpipes_t::iterator it = outpipes.find (identity);
413
        if (it != outpipes.end ())
414
            zmq_assert(false); //  Not allowed to duplicate an existing rid
415
    }
416
    else
417
    if (options.raw_socket) { //  Always assign identity for raw-socket
418
        unsigned char buf [5];
419
        buf [0] = 0;
420
        put_uint32 (buf + 1, next_rid++);
421
        identity = blob_t (buf, sizeof buf);
Martin Hurton's avatar
Martin Hurton committed
422
    }
Tim M's avatar
Tim M committed
423
    else
424
    if (!options.raw_socket) {
425
        //  Pick up handshake cases and also case where next identity is set
426 427 428
        msg.init ();
        ok = pipe_->read (&msg);
        if (!ok)
429
            return false;
430 431

        if (msg.size () == 0) {
432 433 434
            //  Fall back on the auto-generation
            unsigned char buf [5];
            buf [0] = 0;
435
            put_uint32 (buf + 1, next_rid++);
436 437 438 439 440 441 442 443
            identity = blob_t (buf, sizeof buf);
            msg.close ();
        }
        else {
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
            outpipes_t::iterator it = outpipes.find (identity);
            msg.close ();

Pieter Hintjens's avatar
Pieter Hintjens committed
444 445 446
            if (it != outpipes.end ()) {
                if (!handover)
                    //  Ignore peers with duplicate ID
447
                    return false;
Pieter Hintjens's avatar
Pieter Hintjens committed
448
                else {
449
                    //  We will allow the new connection to take over this
450
                    //  identity. Temporarily assign a new identity to the
451 452 453
                    //  existing pipe so we can terminate it asynchronously.
                    unsigned char buf [5];
                    buf [0] = 0;
454
                    put_uint32 (buf + 1, next_rid++);
455 456 457
                    blob_t new_identity = blob_t (buf, sizeof buf);

                    it->second.pipe->set_identity (new_identity);
458
                    outpipe_t existing_outpipe =
459
                        {it->second.pipe, it->second.active};
460

461 462 463
                    ok = outpipes.insert (outpipes_t::value_type (
                        new_identity, existing_outpipe)).second;
                    zmq_assert (ok);
464

465 466 467 468 469 470 471
                    //  Remove the existing identity entry to allow the new
                    //  connection to take the identity.
                    outpipes.erase (it);

                    existing_outpipe.pipe->terminate (true);
                }
            }
472
        }
473 474 475 476 477 478 479 480 481 482
    }

    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
    zmq_assert (ok);

    return true;
}