router.cpp 11.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "router.hpp"
21
#include "pipe.hpp"
22 23
#include "wire.hpp"
#include "random.hpp"
24
#include "likely.hpp"
25
#include "err.hpp"
26

27
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
28
    socket_base_t (parent_, tid_, sid_),
29 30
    prefetched (false),
    identity_sent (false),
31 32
    more_in (false),
    current_out (NULL),
33
    more_out (false),
34
    next_peer_id (generate_random ()),
35
    mandatory (false),
36 37
    //  raw_sock functionality in ROUTER is deprecated
    raw_sock (false),       
38
    probe_router (false)
39
{
40
    options.type = ZMQ_ROUTER;
41
    options.recv_identity = true;
42
    options.raw_sock = false;
43

44
    prefetched_id.init ();
45
    prefetched_msg.init ();
46 47
}

48
zmq::router_t::~router_t ()
49
{
50
    zmq_assert (anonymous_pipes.empty ());;
51
    zmq_assert (outpipes.empty ());
52
    prefetched_id.close ();
53
    prefetched_msg.close ();
54 55
}

56
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
57
{
58 59
    // subscribe_to_all_ is unused
    (void)subscribe_to_all_;
60

61 62
    zmq_assert (pipe_);

63 64 65 66 67 68 69 70 71 72 73 74 75
    if (probe_router) {
        msg_t probe_msg_;
        int rc = probe_msg_.init ();
        errno_assert (rc == 0);

        rc = pipe_->write (&probe_msg_);
        // zmq_assert (rc) is not applicable here, since it is not a bug.
        pipe_->flush ();

        rc = probe_msg_.close ();
        errno_assert (rc == 0);
    }

76 77 78 79 80
    bool identity_ok = identify_peer (pipe_);
    if (identity_ok)
        fq.attach (pipe_);
    else
        anonymous_pipes.insert (pipe_);
81 82
}

83
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
84 85
    size_t optvallen_)
{
86 87 88 89 90 91
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;

    switch (option_) {
        case ZMQ_ROUTER_RAW:
            if (is_int && value >= 0) {
92
                raw_sock = (value != 0);
93 94 95 96 97 98 99
                if (raw_sock) {
                    options.recv_identity = false;
                    options.raw_sock = true;
                }
                return 0;
            }
            break;
100

101 102
        case ZMQ_ROUTER_MANDATORY:
            if (is_int && value >= 0) {
103
                mandatory = (value != 0);
104 105 106
                return 0;
            }
            break;
107

108
        case ZMQ_PROBE_ROUTER:
109
            if (is_int && value >= 0) {
110
                probe_router = (value != 0);
111 112 113
                return 0;
            }
            break;
114

115 116
        default:
            break;
117
    }
118 119
    errno = EINVAL;
    return -1;
120 121
}

122

123
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
124
{
125 126 127 128 129 130 131
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it != anonymous_pipes.end ())
        anonymous_pipes.erase (it);
    else {
        outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
        zmq_assert (it != outpipes.end ());
        outpipes.erase (it);
132
        fq.pipe_terminated (pipe_);
133 134
        if (pipe_ == current_out)
            current_out = NULL;
135
    }
136 137
}

138
void zmq::router_t::xread_activated (pipe_t *pipe_)
139
{
140 141 142 143 144
    std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
    if (it == anonymous_pipes.end ())
        fq.activated (pipe_);
    else {
        bool identity_ok = identify_peer (pipe_);
145
        if (identity_ok) {
146
            anonymous_pipes.erase (it);
147 148
            fq.attach (pipe_);
        }
149
    }
150 151
}

152
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
153
{
154 155 156 157 158 159 160 161
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
Martin Hurton's avatar
Martin Hurton committed
162 163
}

164
int zmq::router_t::xsend (msg_t *msg_)
165
{
166
    //  If this is the first part of the message it's the ID of the
167 168 169 170
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

171
        //  If we have malformed message (prefix with no subsequent message)
172
        //  then just silently ignore it.
173
        //  TODO: The connections should be killed instead.
174
        if (msg_->flags () & msg_t::more) {
175 176 177

            more_out = true;

178
            //  Find the pipe associated with the identity stored in the prefix.
179
            //  If there's no such pipe just silently ignore the message, unless
180
            //  router_mandatory is set.
181 182 183 184 185
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
186
                if (!current_out->check_write ()) {
187 188
                    it->second.active = false;
                    current_out = NULL;
189 190 191 192 193
                    if (mandatory) {
                        more_out = false;
                        errno = EAGAIN;
                        return -1;
                    }
194
                }
Martin Hurton's avatar
Martin Hurton committed
195 196
            }
            else
197
            if (mandatory) {
198
                more_out = false;
Pieter Hintjens's avatar
Pieter Hintjens committed
199
                errno = EHOSTUNREACH;
200
                return -1;
201
            }
202
        }
203

204 205 206 207
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
208
        return 0;
209 210
    }

Martin Hurton's avatar
Martin Hurton committed
211 212 213
    //  Ignore the MORE flag for raw-sock or assert?
    if (options.raw_sock)
        msg_->reset_flags (msg_t::more);
214

215
    //  Check whether this is the last part of the message.
216
    more_out = msg_->flags () & msg_t::more ? true : false;
217

218 219
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
220 221 222 223

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
Martin Hurton's avatar
Martin Hurton committed
224 225
        if (raw_sock && msg_->size() == 0) {
            current_out->terminate (false);
226 227 228 229
            int rc = msg_->close ();
            errno_assert (rc == 0);
            current_out = NULL;
            return 0;
Martin Hurton's avatar
Martin Hurton committed
230
        }
231

232
        bool ok = current_out->write (msg_);
233 234
        if (unlikely (!ok))
            current_out = NULL;
235 236
        else
        if (!more_out) {
237 238 239 240 241
            current_out->flush ();
            current_out = NULL;
        }
    }
    else {
242 243
        int rc = msg_->close ();
        errno_assert (rc == 0);
244
    }
245 246

    //  Detach the message from the data buffer.
247 248
    int rc = msg_->init ();
    errno_assert (rc == 0);
249 250

    return 0;
251 252
}

253
int zmq::router_t::xrecv (msg_t *msg_)
254
{
255 256 257 258 259 260 261 262 263 264 265
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
266
        more_in = msg_->flags () & msg_t::more ? true : false;
267 268 269
        return 0;
    }

270
    pipe_t *pipe = NULL;
271
    int rc = fq.recvpipe (msg_, &pipe);
Martin Hurton's avatar
Martin Hurton committed
272 273 274 275 276 277 278 279

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    while (rc == 0 && msg_->is_identity ())
        rc = fq.recvpipe (msg_, &pipe);

    if (rc != 0)
280
        return -1;
281

282
    zmq_assert (pipe != NULL);
283

284
    //  If we are in the middle of reading a message, just return the next part.
285
    if (more_in)
286
        more_in = msg_->flags () & msg_t::more ? true : false;
287 288 289 290 291 292 293 294 295 296 297 298 299 300
    else {
        //  We are at the beginning of a message.
        //  Keep the message part we have in the prefetch buffer
        //  and return the ID of the peer instead.
        rc = prefetched_msg.move (*msg_);
        errno_assert (rc == 0);
        prefetched = true;

        blob_t identity = pipe->get_identity ();
        rc = msg_->init_size (identity.size ());
        errno_assert (rc == 0);
        memcpy (msg_->data (), identity.data (), identity.size ());
        msg_->set_flags (msg_t::more);
        identity_sent = true;
301
    }
302

303
    return 0;
304 305
}

306
int zmq::router_t::rollback (void)
307 308 309 310 311 312 313 314 315
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

316
bool zmq::router_t::xhas_in ()
317
{
318
    //  If we are in the middle of reading the messages, there are
319 320 321 322
    //  definitely more parts available.
    if (more_in)
        return true;

323
    //  We may already have a message pre-fetched.
324
    if (prefetched)
325
        return true;
326

327 328 329
    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
330
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
Martin Hurton's avatar
Martin Hurton committed
331 332 333 334 335 336 337 338

    //  It's possible that we receive peer's identity. That happens
    //  after reconnection. The current implementation assumes that
    //  the peer always uses the same identity.
    //  TODO: handle the situation when the peer changes its identity.
    while (rc == 0 && prefetched_msg.is_identity ())
        rc = fq.recvpipe (&prefetched_msg, &pipe);

339
    if (rc != 0)
340
        return false;
341

Martin Hurton's avatar
Martin Hurton committed
342
    zmq_assert (pipe != NULL);
343 344 345 346 347 348 349 350 351

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;
352

353
    return true;
354 355
}

356
bool zmq::router_t::xhas_out ()
357
{
358
    //  In theory, ROUTER socket is always ready for writing. Whether actual
359 360 361
    //  attempt to write succeeds depends on whitch pipe the message is going
    //  to be routed to.
    return true;
362 363
}

364 365 366 367
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
    msg_t msg;
    blob_t identity;
368
    bool ok;
369

Martin Hurton's avatar
Martin Hurton committed
370
    if (options.raw_sock) { //  Always assign identity for raw-socket
371 372 373 374
        unsigned char buf [5];
        buf [0] = 0;
        put_uint32 (buf + 1, next_peer_id++);
        identity = blob_t (buf, sizeof buf);
Martin Hurton's avatar
Martin Hurton committed
375 376
    }
    else {
377 378 379
        msg.init ();
        ok = pipe_->read (&msg);
        if (!ok)
380
            return false;
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

        if (msg.size () == 0) {
            //  Fall back on the auto-generation
            unsigned char buf [5];
            buf [0] = 0;
            put_uint32 (buf + 1, next_peer_id++);
            identity = blob_t (buf, sizeof buf);
            msg.close ();
        }
        else {
            identity = blob_t ((unsigned char*) msg.data (), msg.size ());
            outpipes_t::iterator it = outpipes.find (identity);
            msg.close ();

            //  Ignore peers with duplicate ID.
            if (it != outpipes.end ())
                return false;
        }
399 400 401 402 403 404 405 406 407 408
    }

    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
    ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
    zmq_assert (ok);

    return true;
}