router.cpp 12.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "router.hpp"
21
#include "pipe.hpp"
22 23
#include "wire.hpp"
#include "random.hpp"
24
#include "likely.hpp"
25
#include "err.hpp"
26

27
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
28
    socket_base_t (parent_, tid_, sid_),
29 30
    prefetched (false),
    identity_sent (false),
31 32
    more_in (false),
    current_out (NULL),
33
    more_out (false),
34
    next_peer_id (generate_random ()),
35
    mandatory (false),
36 37
    //  raw_sock functionality in ROUTER is deprecated
    raw_sock (false),       
38
    probe_router (false),
Pieter Hintjens's avatar
Pieter Hintjens committed
39
    handover (false)
40
{
41
    options.type = ZMQ_ROUTER;
42
    options.recv_identity = true;
43
    options.raw_sock = false;
44

45
    prefetched_id.init ();
46
    prefetched_msg.init ();
47 48
}

49
zmq::router_t::~router_t ()
50
{
51
    zmq_assert (anonymous_pipes.empty ());;
52
    zmq_assert (outpipes.empty ());
53
    prefetched_id.close ();
54
    prefetched_msg.close ();
55 56
}

57
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
58
{
59 60
    // subscribe_to_all_ is unused
    (void)subscribe_to_all_;
61

62 63
    zmq_assert (pipe_);

64 65 66 67 68 69 70 71 72 73 74 75 76
    if (probe_router) {
        msg_t probe_msg_;
        int rc = probe_msg_.init ();
        errno_assert (rc == 0);

        rc = pipe_->write (&probe_msg_);
        // zmq_assert (rc) is not applicable here, since it is not a bug.
        pipe_->flush ();

        rc = probe_msg_.close ();
        errno_assert (rc == 0);
    }

77 78 79 80 81
    bool identity_ok = identify_peer (pipe_);
    if (identity_ok)
        fq.attach (pipe_);
    else
        anonymous_pipes.insert (pipe_);
82 83
}

84
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
85 86
    size_t optvallen_)
{
87 88 89 90 91 92
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;

    switch (option_) {
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
93
                raw_sock = (value != 0);
94 95 96 97 98 99 100
                if (raw_sock) {
                    options.recv_identity = false;
                    options.raw_sock = true;
                }
                return 0;
            }
            break;
101

102 103
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
104
                mandatory = (value != 0);
105 106 107
                return 0;
            }
            break;
108

109
        case ZMQ_PROBE_ROUTER:
110
            if (is_int && value >= 0) {
111
                probe_router = (value != 0);
112 113 114
                return 0;
            }
            break;
Pieter Hintjens's avatar
Pieter Hintjens committed
115
            
116
        case ZMQ_ROUTER_HANDOVER: 
117
            if (is_int && value >= 0) {
118
                handover = (value != 0);
119 120 121
                return 0;
            }
            break;
122

123 124
        default:
            break;
125
    }
126 127
    errno = EINVAL;
    return -1;
128 129
}

130

131
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
132
{
133 134 135 136 137 138 139
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it != anonymous_pipes.end ())
        anonymous_pipes.erase (it);
    else {
        outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
        zmq_assert (it != outpipes.end ());
        outpipes.erase (it);
140
        fq.pipe_terminated (pipe_);
141 142
        if (pipe_ == current_out)
            current_out = NULL;
143
    }
144 145
}

146
void zmq::router_t::xread_activated (pipe_t *pipe_)
147
{
148 149 150 151 152
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it == anonymous_pipes.end ())
        fq.activated (pipe_);
    else {
        bool identity_ok = identify_peer (pipe_);
153
        if (identity_ok) {
154
            anonymous_pipes.erase (it);
155 156
            fq.attach (pipe_);
        }
157
    }
158 159
}

160
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
161
{
162 163 164 165 166 167 168 169
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
Martin Hurton's avatar
Martin Hurton committed
170 171
}

172
int zmq::router_t::xsend (msg_t *msg_)
173
{
174
    //  If this is the first part of the message it's the ID of the
175 176 177 178
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

179
        //  If we have malformed message (prefix with no subsequent message)
180
        //  then just silently ignore it.
181
        //  TODO: The connections should be killed instead.
182
        if (msg_->flags () & msg_t::more) {
183 184 185

            more_out = true;

186
            //  Find the pipe associated with the identity stored in the prefix.
187
            //  If there's no such pipe just silently ignore the message, unless
188
            //  router_mandatory is set.
189 190 191 192 193
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
194
                if (!current_out->check_write ()) {
195 196
                    it->second.active = false;
                    current_out = NULL;
197 198 199 200 201
                    if (mandatory) {
                        more_out = false;
                        errno = EAGAIN;
                        return -1;
                    }
202
                }
Martin Hurton's avatar
Martin Hurton committed
203 204
            }
            else
205
            if (mandatory) {
206
                more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
207
                errno = EHOSTUNREACH;
208
                return -1;
209
            }
210
        }
211

212 213 214 215
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
216
        return 0;
217 218
    }

Martin Hurton's avatar
Martin Hurton committed
219 220 221
    //  Ignore the MORE flag for raw-sock or assert?
    if (options.raw_sock)
        msg_->reset_flags (msg_t::more);
222

223
    //  Check whether this is the last part of the message.
224
    more_out = msg_->flags () & msg_t::more ? true : false;
225

226 227
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
228 229 230 231

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
Martin Hurton's avatar
Martin Hurton committed
232 233
        if (raw_sock && msg_->size() == 0) {
            current_out->terminate (false);
234 235
            int rc = msg_->close ();
            errno_assert (rc == 0);
236 237
            rc = msg_->init ();
            errno_assert (rc == 0);
238 239
            current_out = NULL;
            return 0;
Martin Hurton's avatar
Martin Hurton committed
240
        }
241

242
        bool ok = current_out->write (msg_);
243 244
        if (unlikely (!ok))
            current_out = NULL;
245 246
        else
        if (!more_out) {
247 248 249 250 251
            current_out->flush ();
            current_out = NULL;
        }
    }
    else {
252 253
        int rc = msg_->close ();
        errno_assert (rc == 0);
254
    }
255 256

    //  Detach the message from the data buffer.
257 258
    int rc = msg_->init ();
    errno_assert (rc == 0);
259 260

    return 0;
261 262
}

263
int zmq::router_t::xrecv (msg_t *msg_)
264
{
265 266 267 268 269 270 271 272 273 274 275
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
276
        more_in = msg_->flags () & msg_t::more ? true : false;
277 278 279
        return 0;
    }

280
    pipe_t *pipe = NULL;
281
    int rc = fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
282 283 284 285 286 287 288 289

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    while (rc == 0 && msg_->is_identity ())
        rc = fq.recvpipe (msg_, &pipe);

    if (rc != 0)
290
        return -1;
291

292
    zmq_assert (pipe != NULL);
293

294
    //  If we are in the middle of reading a message, just return the next part.
295
    if (more_in)
296
        more_in = msg_->flags () & msg_t::more ? true : false;
297 298 299 300 301 302 303 304 305 306 307 308 309 310
    else {
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
        rc = prefetched_msg.move (*msg_);
        errno_assert (rc == 0);
        prefetched = true;

        blob_t identity = pipe->get_identity ();
        rc = msg_->init_size (identity.size ());
        errno_assert (rc == 0);
        memcpy (msg_->data (), identity.data (), identity.size ());
        msg_->set_flags (msg_t::more);
        identity_sent = true;
311
    }
312

313
    return 0;
314 315
}

316
int zmq::router_t::rollback (void)
317 318 319 320 321 322 323 324 325
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

326
bool zmq::router_t::xhas_in ()
327
{
328
    //  If we are in the middle of reading the messages, there are
329 330 331 332
    //  definitely more parts available.
    if (more_in)
        return true;

333
    //  We may already have a message pre-fetched.
334
    if (prefetched)
335
        return true;
336

337 338 339
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
340
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
341 342 343 344 345 346 347 348

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    //  TODO: handle the situation when the peer changes its identity.
    while (rc == 0 && prefetched_msg.is_identity ())
        rc = fq.recvpipe (&prefetched_msg, &pipe);

349
    if (rc != 0)
350
        return false;
351

Martin Hurton's avatar
Martin Hurton committed
352
    zmq_assert (pipe != NULL);
353 354 355 356 357 358 359 360 361

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;
362

363
    return true;
364 365
}

366
bool zmq::router_t::xhas_out ()
367
{
368
    //  In theory, ROUTER socket is always ready for writing. Whether actual
369 370 371
    //  attempt to write succeeds depends on whitch pipe the message is going
    //  to be routed to.
    return true;
372 373
}

374 375 376 377
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
    msg_t msg;
    blob_t identity;
378
    bool ok;
379

Martin Hurton's avatar
Martin Hurton committed
380
    if (options.raw_sock) { //  Always assign identity for raw-socket
381 382 383 384
        unsigned char buf [5];
        buf [0] = 0;
        put_uint32 (buf + 1, next_peer_id++);
        identity = blob_t (buf, sizeof buf);
Martin Hurton's avatar
Martin Hurton committed
385 386
    }
    else {
387 388 389
        msg.init ();
        ok = pipe_->read (&msg);
        if (!ok)
390
            return false;
391 392 393 394 395 396 397 398 399 400 401 402 403 404

        if (msg.size () == 0) {
            //  Fall back on the auto-generation
            unsigned char buf [5];
            buf [0] = 0;
            put_uint32 (buf + 1, next_peer_id++);
            identity = blob_t (buf, sizeof buf);
            msg.close ();
        }
        else {
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
            outpipes_t::iterator it = outpipes.find (identity);
            msg.close ();

Pieter Hintjens's avatar
Pieter Hintjens committed
405 406 407
            if (it != outpipes.end ()) {
                if (!handover)
                    //  Ignore peers with duplicate ID
408
                    return false;
Pieter Hintjens's avatar
Pieter Hintjens committed
409
                else {
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
                    //  We will allow the new connection to take over this
                    //  identity. Temporarily assign a new identity to the 
                    //  existing pipe so we can terminate it asynchronously.
                    unsigned char buf [5];
                    buf [0] = 0;
                    put_uint32 (buf + 1, next_peer_id++);
                    blob_t new_identity = blob_t (buf, sizeof buf);

                    it->second.pipe->set_identity (new_identity);
                    outpipe_t existing_outpipe = 
                        {it->second.pipe, it->second.active};
                
                    ok = outpipes.insert (outpipes_t::value_type (
                        new_identity, existing_outpipe)).second;
                    zmq_assert (ok);
                
                    //  Remove the existing identity entry to allow the new
                    //  connection to take the identity.
                    outpipes.erase (it);

                    existing_outpipe.pipe->terminate (true);
                }
            }
433
        }
434 435 436 437 438 439 440 441 442 443
    }

    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
    zmq_assert (ok);

    return true;
}