router.cpp 12.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "router.hpp"
21
#include "pipe.hpp"
22 23
#include "wire.hpp"
#include "random.hpp"
24
#include "likely.hpp"
25
#include "err.hpp"
26

27
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
28
    socket_base_t (parent_, tid_, sid_),
29 30
    prefetched (false),
    identity_sent (false),
31 32
    more_in (false),
    current_out (NULL),
33
    more_out (false),
34
    next_peer_id (generate_random ()),
35
    mandatory (false),
36 37
    //  raw_sock functionality in ROUTER is deprecated
    raw_sock (false),       
38
    probe_router (false),
39
    handover(false)
40
{
41
    options.type = ZMQ_ROUTER;
42
    options.recv_identity = true;
43
    options.raw_sock = false;
44

45
    prefetched_id.init ();
46
    prefetched_msg.init ();
47 48
}

49
zmq::router_t::~router_t ()
50
{
51
    zmq_assert (anonymous_pipes.empty ());;
52
    zmq_assert (outpipes.empty ());
53
    prefetched_id.close ();
54
    prefetched_msg.close ();
55 56
}

57
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
58
{
59 60
    // subscribe_to_all_ is unused
    (void)subscribe_to_all_;
61

62 63
    zmq_assert (pipe_);

64 65 66 67 68 69 70 71 72 73 74 75 76
    if (probe_router) {
        msg_t probe_msg_;
        int rc = probe_msg_.init ();
        errno_assert (rc == 0);

        rc = pipe_->write (&probe_msg_);
        // zmq_assert (rc) is not applicable here, since it is not a bug.
        pipe_->flush ();

        rc = probe_msg_.close ();
        errno_assert (rc == 0);
    }

77 78 79 80 81
    bool identity_ok = identify_peer (pipe_);
    if (identity_ok)
        fq.attach (pipe_);
    else
        anonymous_pipes.insert (pipe_);
82 83
}

84
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
85 86
    size_t optvallen_)
{
87 88 89 90 91 92
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;

    switch (option_) {
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
93
                raw_sock = (value != 0);
94 95 96 97 98 99 100
                if (raw_sock) {
                    options.recv_identity = false;
                    options.raw_sock = true;
                }
                return 0;
            }
            break;
101

102 103
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
104
                mandatory = (value != 0);
105 106 107
                return 0;
            }
            break;
108

109
        case ZMQ_PROBE_ROUTER:
110
            if (is_int && value >= 0) {
111
                probe_router = (value != 0);
112 113 114
                return 0;
            }
            break;
115
        case ZMQ_ROUTER_HANDOVER: 
116
            if (is_int && value >= 0) {
117
                handover = (value != 0);
118 119 120
                return 0;
            }
            break;
121

122 123
        default:
            break;
124
    }
125 126
    errno = EINVAL;
    return -1;
127 128
}

129

130
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
131
{
132 133 134 135 136 137 138
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it != anonymous_pipes.end ())
        anonymous_pipes.erase (it);
    else {
        outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
        zmq_assert (it != outpipes.end ());
        outpipes.erase (it);
139
        fq.pipe_terminated (pipe_);
140 141
        if (pipe_ == current_out)
            current_out = NULL;
142
    }
143 144
}

145
void zmq::router_t::xread_activated (pipe_t *pipe_)
146
{
147 148 149 150 151
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it == anonymous_pipes.end ())
        fq.activated (pipe_);
    else {
        bool identity_ok = identify_peer (pipe_);
152
        if (identity_ok) {
153
            anonymous_pipes.erase (it);
154 155
            fq.attach (pipe_);
        }
156
    }
157 158
}

159
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
160
{
161 162 163 164 165 166 167 168
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
Martin Hurton's avatar
Martin Hurton committed
169 170
}

171
int zmq::router_t::xsend (msg_t *msg_)
172
{
173
    //  If this is the first part of the message it's the ID of the
174 175 176 177
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

178
        //  If we have malformed message (prefix with no subsequent message)
179
        //  then just silently ignore it.
180
        //  TODO: The connections should be killed instead.
181
        if (msg_->flags () & msg_t::more) {
182 183 184

            more_out = true;

185
            //  Find the pipe associated with the identity stored in the prefix.
186
            //  If there's no such pipe just silently ignore the message, unless
187
            //  router_mandatory is set.
188 189 190 191 192
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
193
                if (!current_out->check_write ()) {
194 195
                    it->second.active = false;
                    current_out = NULL;
196 197 198 199 200
                    if (mandatory) {
                        more_out = false;
                        errno = EAGAIN;
                        return -1;
                    }
201
                }
Martin Hurton's avatar
Martin Hurton committed
202 203
            }
            else
204
            if (mandatory) {
205
                more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
206
                errno = EHOSTUNREACH;
207
                return -1;
208
            }
209
        }
210

211 212 213 214
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
215
        return 0;
216 217
    }

Martin Hurton's avatar
Martin Hurton committed
218 219 220
    //  Ignore the MORE flag for raw-sock or assert?
    if (options.raw_sock)
        msg_->reset_flags (msg_t::more);
221

222
    //  Check whether this is the last part of the message.
223
    more_out = msg_->flags () & msg_t::more ? true : false;
224

225 226
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
227 228 229 230

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
Martin Hurton's avatar
Martin Hurton committed
231 232
        if (raw_sock && msg_->size() == 0) {
            current_out->terminate (false);
233 234
            int rc = msg_->close ();
            errno_assert (rc == 0);
235 236
            rc = msg_->init ();
            errno_assert (rc == 0);
237 238
            current_out = NULL;
            return 0;
Martin Hurton's avatar
Martin Hurton committed
239
        }
240

241
        bool ok = current_out->write (msg_);
242 243
        if (unlikely (!ok))
            current_out = NULL;
244 245
        else
        if (!more_out) {
246 247 248 249 250
            current_out->flush ();
            current_out = NULL;
        }
    }
    else {
251 252
        int rc = msg_->close ();
        errno_assert (rc == 0);
253
    }
254 255

    //  Detach the message from the data buffer.
256 257
    int rc = msg_->init ();
    errno_assert (rc == 0);
258 259

    return 0;
260 261
}

262
int zmq::router_t::xrecv (msg_t *msg_)
263
{
264 265 266 267 268 269 270 271 272 273 274
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
275
        more_in = msg_->flags () & msg_t::more ? true : false;
276 277 278
        return 0;
    }

279
    pipe_t *pipe = NULL;
280
    int rc = fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
281 282 283 284 285 286 287 288

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    while (rc == 0 && msg_->is_identity ())
        rc = fq.recvpipe (msg_, &pipe);

    if (rc != 0)
289
        return -1;
290

291
    zmq_assert (pipe != NULL);
292

293
    //  If we are in the middle of reading a message, just return the next part.
294
    if (more_in)
295
        more_in = msg_->flags () & msg_t::more ? true : false;
296 297 298 299 300 301 302 303 304 305 306 307 308 309
    else {
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
        rc = prefetched_msg.move (*msg_);
        errno_assert (rc == 0);
        prefetched = true;

        blob_t identity = pipe->get_identity ();
        rc = msg_->init_size (identity.size ());
        errno_assert (rc == 0);
        memcpy (msg_->data (), identity.data (), identity.size ());
        msg_->set_flags (msg_t::more);
        identity_sent = true;
310
    }
311

312
    return 0;
313 314
}

315
int zmq::router_t::rollback (void)
316 317 318 319 320 321 322 323 324
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

325
bool zmq::router_t::xhas_in ()
326
{
327
    //  If we are in the middle of reading the messages, there are
328 329 330 331
    //  definitely more parts available.
    if (more_in)
        return true;

332
    //  We may already have a message pre-fetched.
333
    if (prefetched)
334
        return true;
335

336 337 338
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
339
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
340 341 342 343 344 345 346 347

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    //  TODO: handle the situation when the peer changes its identity.
    while (rc == 0 && prefetched_msg.is_identity ())
        rc = fq.recvpipe (&prefetched_msg, &pipe);

348
    if (rc != 0)
349
        return false;
350

Martin Hurton's avatar
Martin Hurton committed
351
    zmq_assert (pipe != NULL);
352 353 354 355 356 357 358 359 360

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;
361

362
    return true;
363 364
}

365
bool zmq::router_t::xhas_out ()
366
{
367
    //  In theory, ROUTER socket is always ready for writing. Whether actual
368 369 370
    //  attempt to write succeeds depends on whitch pipe the message is going
    //  to be routed to.
    return true;
371 372
}

373 374 375 376
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
    msg_t msg;
    blob_t identity;
377
    bool ok;
378

Martin Hurton's avatar
Martin Hurton committed
379
    if (options.raw_sock) { //  Always assign identity for raw-socket
380 381 382 383
        unsigned char buf [5];
        buf [0] = 0;
        put_uint32 (buf + 1, next_peer_id++);
        identity = blob_t (buf, sizeof buf);
Martin Hurton's avatar
Martin Hurton committed
384 385
    }
    else {
386 387 388
        msg.init ();
        ok = pipe_->read (&msg);
        if (!ok)
389
            return false;
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404

        if (msg.size () == 0) {
            //  Fall back on the auto-generation
            unsigned char buf [5];
            buf [0] = 0;
            put_uint32 (buf + 1, next_peer_id++);
            identity = blob_t (buf, sizeof buf);
            msg.close ();
        }
        else {
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
            outpipes_t::iterator it = outpipes.find (identity);
            msg.close ();

            if (it != outpipes.end ())
405
            {
406
                if (!handover) {
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
                    //  Ignore peers with duplicate ID.
                    return false;
                }
                else
                {
                    //  We will allow the new connection to take over this
                    //  identity. Temporarily assign a new identity to the 
                    //  existing pipe so we can terminate it asynchronously.
                    unsigned char buf [5];
                    buf [0] = 0;
                    put_uint32 (buf + 1, next_peer_id++);
                    blob_t new_identity = blob_t (buf, sizeof buf);

                    it->second.pipe->set_identity (new_identity);
                    outpipe_t existing_outpipe = 
                        {it->second.pipe, it->second.active};
                
                    ok = outpipes.insert (outpipes_t::value_type (
                        new_identity, existing_outpipe)).second;
                    zmq_assert (ok);
                
                    //  Remove the existing identity entry to allow the new
                    //  connection to take the identity.
                    outpipes.erase (it);

                    existing_outpipe.pipe->terminate (true);
                }
            }
435
        }
436 437 438 439 440 441 442 443 444 445
    }

    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
    zmq_assert (ok);

    return true;
}