router.cpp 15 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#include "router.hpp"
33
#include "pipe.hpp"
34 35
#include "wire.hpp"
#include "random.hpp"
36
#include "likely.hpp"
37
#include "err.hpp"
38

39
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40
    socket_base_t (parent_, tid_, sid_),
41 42
    prefetched (false),
    identity_sent (false),
43 44
    current_in (NULL),
    terminate_current_in (false),
45 46
    more_in (false),
    current_out (NULL),
47
    more_out (false),
48
    next_rid (generate_random ()),
49
    mandatory (false),
50
    //  raw_socket functionality in ROUTER is deprecated
51
    raw_socket (false),
52
    probe_router (false),
Pieter Hintjens's avatar
Pieter Hintjens committed
53
    handover (false)
54
{
55
    options.type = ZMQ_ROUTER;
56
    options.recv_identity = true;
57
    options.raw_socket = false;
58

59
    prefetched_id.init ();
60
    prefetched_msg.init ();
61 62
}

63
zmq::router_t::~router_t ()
64
{
65
    zmq_assert (anonymous_pipes.empty ());;
66
    zmq_assert (outpipes.empty ());
67
    prefetched_id.close ();
68
    prefetched_msg.close ();
69 70
}

71
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
72
{
73
    LIBZMQ_UNUSED (subscribe_to_all_);
74

75 76
    zmq_assert (pipe_);

77 78 79 80 81 82 83 84 85 86 87 88 89
    if (probe_router) {
        msg_t probe_msg_;
        int rc = probe_msg_.init ();
        errno_assert (rc == 0);

        rc = pipe_->write (&probe_msg_);
        // zmq_assert (rc) is not applicable here, since it is not a bug.
        pipe_->flush ();

        rc = probe_msg_.close ();
        errno_assert (rc == 0);
    }

90 91 92 93 94
    bool identity_ok = identify_peer (pipe_);
    if (identity_ok)
        fq.attach (pipe_);
    else
        anonymous_pipes.insert (pipe_);
95 96
}

97
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
98 99
    size_t optvallen_)
{
100
    bool is_int = (optvallen_ == sizeof (int));
101 102
    int value = 0;
    if (is_int) memcpy(&value, optval_, sizeof (int));
103 104

    switch (option_) {
105 106 107
        case ZMQ_CONNECT_RID:
            if (optval_ && optvallen_) {
                connect_rid.assign ((char *) optval_, optvallen_);
108 109
                return 0;
            }
110
            break;
111

112 113
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
114 115
                raw_socket = (value != 0);
                if (raw_socket) {
116
                    options.recv_identity = false;
117
                    options.raw_socket = true;
118 119 120 121
                }
                return 0;
            }
            break;
122

123 124
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
125
                mandatory = (value != 0);
126 127 128
                return 0;
            }
            break;
129

130
        case ZMQ_PROBE_ROUTER:
131
            if (is_int && value >= 0) {
132
                probe_router = (value != 0);
133 134 135
                return 0;
            }
            break;
136 137

        case ZMQ_ROUTER_HANDOVER:
138
            if (is_int && value >= 0) {
139
                handover = (value != 0);
140 141 142
                return 0;
            }
            break;
143

144 145
        default:
            break;
146
    }
147 148
    errno = EINVAL;
    return -1;
149 150
}

151

152
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
153
{
154 155 156 157
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it != anonymous_pipes.end ())
        anonymous_pipes.erase (it);
    else {
158 159 160
        outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
        zmq_assert (iter != outpipes.end ());
        outpipes.erase (iter);
161
        fq.pipe_terminated (pipe_);
162 163
        if (pipe_ == current_out)
            current_out = NULL;
164
    }
165 166
}

167
void zmq::router_t::xread_activated (pipe_t *pipe_)
168
{
169 170 171 172 173
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it == anonymous_pipes.end ())
        fq.activated (pipe_);
    else {
        bool identity_ok = identify_peer (pipe_);
174
        if (identity_ok) {
175
            anonymous_pipes.erase (it);
176 177
            fq.attach (pipe_);
        }
178
    }
179 180
}

181
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
182
{
183 184 185 186 187 188 189 190
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
Martin Hurton's avatar
Martin Hurton committed
191 192
}

193
int zmq::router_t::xsend (msg_t *msg_)
194
{
195
    //  If this is the first part of the message it's the ID of the
196 197 198 199
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

200
        //  If we have malformed message (prefix with no subsequent message)
201
        //  then just silently ignore it.
202
        //  TODO: The connections should be killed instead.
203
        if (msg_->flags () & msg_t::more) {
204 205 206

            more_out = true;

207
            //  Find the pipe associated with the identity stored in the prefix.
208
            //  If there's no such pipe just silently ignore the message, unless
209
            //  router_mandatory is set.
210 211 212 213 214
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
215
                if (!current_out->check_write ()) {
216 217
                    it->second.active = false;
                    current_out = NULL;
218 219 220 221 222
                    if (mandatory) {
                        more_out = false;
                        errno = EAGAIN;
                        return -1;
                    }
223
                }
Martin Hurton's avatar
Martin Hurton committed
224 225
            }
            else
226
            if (mandatory) {
227
                more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
228
                errno = EHOSTUNREACH;
229
                return -1;
230
            }
231
        }
232

233 234 235 236
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
237
        return 0;
238 239
    }

Martin Hurton's avatar
Martin Hurton committed
240
    //  Ignore the MORE flag for raw-sock or assert?
241
    if (options.raw_socket)
Martin Hurton's avatar
Martin Hurton committed
242
        msg_->reset_flags (msg_t::more);
243

244
    //  Check whether this is the last part of the message.
245
    more_out = msg_->flags () & msg_t::more ? true : false;
246

247 248
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
249 250 251 252

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
253
        if (raw_socket && msg_->size() == 0) {
Martin Hurton's avatar
Martin Hurton committed
254
            current_out->terminate (false);
255 256
            int rc = msg_->close ();
            errno_assert (rc == 0);
257 258
            rc = msg_->init ();
            errno_assert (rc == 0);
259 260
            current_out = NULL;
            return 0;
Martin Hurton's avatar
Martin Hurton committed
261
        }
262

263
        bool ok = current_out->write (msg_);
264 265 266 267
        if (unlikely (!ok)) {
            // Message failed to send - we must close it ourselves.
            int rc = msg_->close ();
            errno_assert (rc == 0);
268
            current_out = NULL;
269 270 271 272 273
        } else {
          if (!more_out) {
              current_out->flush ();
              current_out = NULL;
          }
274 275 276
        }
    }
    else {
277 278
        int rc = msg_->close ();
        errno_assert (rc == 0);
279
    }
280 281

    //  Detach the message from the data buffer.
282 283
    int rc = msg_->init ();
    errno_assert (rc == 0);
284 285

    return 0;
286 287
}

288
int zmq::router_t::xrecv (msg_t *msg_)
289
{
290 291 292 293 294 295 296 297 298 299 300
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
301
        more_in = msg_->flags () & msg_t::more ? true : false;
302 303 304 305 306 307 308 309
 
        if (!more_in) {
            if (terminate_current_in) {
                current_in->terminate (true);
                terminate_current_in = false;
            }
            current_in = NULL;
        }
310 311 312
        return 0;
    }

313
    pipe_t *pipe = NULL;
314
    int rc = fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
315 316 317 318 319 320 321 322

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    while (rc == 0 && msg_->is_identity ())
        rc = fq.recvpipe (msg_, &pipe);

    if (rc != 0)
323
        return -1;
324

325
    zmq_assert (pipe != NULL);
326

327
    //  If we are in the middle of reading a message, just return the next part.
328
    if (more_in) {
329
        more_in = msg_->flags () & msg_t::more ? true : false;
330 331 332 333 334 335 336 337 338

        if (!more_in) {
            if (terminate_current_in) {
                current_in->terminate (true);
                terminate_current_in = false;
            }
            current_in = NULL;
        }
    }
339 340 341 342 343 344 345
    else {
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
        rc = prefetched_msg.move (*msg_);
        errno_assert (rc == 0);
        prefetched = true;
346
        current_in = pipe;
347 348 349 350 351 352

        blob_t identity = pipe->get_identity ();
        rc = msg_->init_size (identity.size ());
        errno_assert (rc == 0);
        memcpy (msg_->data (), identity.data (), identity.size ());
        msg_->set_flags (msg_t::more);
353 354
        if (prefetched_msg.metadata())
            msg_->set_metadata(prefetched_msg.metadata());
355
        identity_sent = true;
356
    }
357

358
    return 0;
359 360
}

361
int zmq::router_t::rollback (void)
362 363 364 365 366 367 368 369 370
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

371
bool zmq::router_t::xhas_in ()
372
{
373
    //  If we are in the middle of reading the messages, there are
374 375 376 377
    //  definitely more parts available.
    if (more_in)
        return true;

378
    //  We may already have a message pre-fetched.
379
    if (prefetched)
380
        return true;
381

382 383 384
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
385
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
386 387 388 389 390 391 392 393

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    //  TODO: handle the situation when the peer changes its identity.
    while (rc == 0 && prefetched_msg.is_identity ())
        rc = fq.recvpipe (&prefetched_msg, &pipe);

394
    if (rc != 0)
395
        return false;
396

Martin Hurton's avatar
Martin Hurton committed
397
    zmq_assert (pipe != NULL);
398 399 400 401 402 403 404 405 406

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;
407
    current_in = pipe;
408

409
    return true;
410 411
}

412
bool zmq::router_t::xhas_out ()
413
{
414
    //  In theory, ROUTER socket is always ready for writing. Whether actual
415
    //  attempt to write succeeds depends on which pipe the message is going
416 417
    //  to be routed to.
    return true;
418 419
}

420 421 422 423 424
zmq::blob_t zmq::router_t::get_credential () const
{
    return fq.get_credential ();
}

425 426 427 428
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
    msg_t msg;
    blob_t identity;
429
    bool ok;
430

431 432 433 434
    if (connect_rid.length()) {
        identity = blob_t ((unsigned char*) connect_rid.c_str (),
            connect_rid.length());
        connect_rid.clear ();
435
        outpipes_t::iterator it = outpipes.find (identity);
436
        if (it != outpipes.end ())
437
            zmq_assert(false); //  Not allowed to duplicate an existing rid
438
    }
439
    else
440
    if (options.raw_socket) { //  Always assign identity for raw-socket
441
        unsigned char buf [5];
442
        buf [0] = 0;
443
        put_uint32 (buf + 1, next_rid++);
444
        identity = blob_t (buf, sizeof buf);
Martin Hurton's avatar
Martin Hurton committed
445
    }
Tim M's avatar
Tim M committed
446
    else
447
    if (!options.raw_socket) {
448
        //  Pick up handshake cases and also case where next identity is set
449 450 451
        msg.init ();
        ok = pipe_->read (&msg);
        if (!ok)
452
            return false;
453 454

        if (msg.size () == 0) {
455 456 457
            //  Fall back on the auto-generation
            unsigned char buf [5];
            buf [0] = 0;
458
            put_uint32 (buf + 1, next_rid++);
459 460 461 462 463 464 465 466
            identity = blob_t (buf, sizeof buf);
            msg.close ();
        }
        else {
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
            outpipes_t::iterator it = outpipes.find (identity);
            msg.close ();

Pieter Hintjens's avatar
Pieter Hintjens committed
467 468 469
            if (it != outpipes.end ()) {
                if (!handover)
                    //  Ignore peers with duplicate ID
470
                    return false;
Pieter Hintjens's avatar
Pieter Hintjens committed
471
                else {
472
                    //  We will allow the new connection to take over this
473
                    //  identity. Temporarily assign a new identity to the
474 475 476
                    //  existing pipe so we can terminate it asynchronously.
                    unsigned char buf [5];
                    buf [0] = 0;
477
                    put_uint32 (buf + 1, next_rid++);
478 479 480
                    blob_t new_identity = blob_t (buf, sizeof buf);

                    it->second.pipe->set_identity (new_identity);
481
                    outpipe_t existing_outpipe =
482
                        {it->second.pipe, it->second.active};
483

484 485 486
                    ok = outpipes.insert (outpipes_t::value_type (
                        new_identity, existing_outpipe)).second;
                    zmq_assert (ok);
487

488 489 490 491
                    //  Remove the existing identity entry to allow the new
                    //  connection to take the identity.
                    outpipes.erase (it);

492 493 494 495
                    if (existing_outpipe.pipe == current_in)
                        terminate_current_in = true;
                    else
                        existing_outpipe.pipe->terminate (true);
496 497
                }
            }
498
        }
499 500 501 502 503 504 505 506 507 508
    }

    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
    zmq_assert (ok);

    return true;
}