xrep.cpp 7.68 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "../include/zmq.h"
21 22 23

#include "xrep.hpp"
#include "err.hpp"
24
#include "pipe.hpp"
25 26

zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
27 28 29 30 31
    socket_base_t (parent_),
    current_in (0),
    more_in (false),
    current_out (NULL),
    more_out (false)
32 33 34
{
    options.requires_in = true;
    options.requires_out = true;
35

36 37 38
    //  On connect, pipes are created only after initial handshaking.
    //  That way we are aware of the peer's identity when binding to the pipes.
    options.immediate_connect = false;
39 40 41 42
}

zmq::xrep_t::~xrep_t ()
{
43 44 45 46 47
    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++)
        it->reader->term ();
    for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
          it++)
        it->second.writer->term ();
48 49 50
}

void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
51
    class writer_t *outpipe_, const blob_t &peer_identity_)
52
{
53 54
    zmq_assert (inpipe_ && outpipe_);

55
    //  TODO: What if new connection has same peer identity as the old one?
56
    outpipe_t outpipe = {outpipe_, true};
57
    bool ok = outpipes.insert (std::make_pair (
58
        peer_identity_, outpipe)).second;
59
    zmq_assert (ok);
60 61 62

    inpipe_t inpipe = {inpipe_, peer_identity_, true};
    inpipes.push_back (inpipe);
63 64 65 66
}

void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
{
67 68 69 70 71 72 73 74 75
// TODO:!
    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
          it++) {
        if (it->reader == pipe_) {
            inpipes.erase (it);
            return;
        }
    }
    zmq_assert (false);
76 77 78 79
}

void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
80
    for (outpipes_t::iterator it = outpipes.begin ();
81 82
          it != outpipes.end (); ++it) {
        if (it->second.writer == pipe_) {
83
            outpipes.erase (it);
84 85
            if (pipe_ == current_out)
                current_out = NULL;
86 87
            return;
        }
88
    }
89 90 91 92 93
    zmq_assert (false);
}

void zmq::xrep_t::xkill (class reader_t *pipe_)
{
94 95 96 97 98 99 100 101 102
    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
          it++) {
        if (it->reader == pipe_) {
            zmq_assert (it->active);
            it->active = false;
            return;
        }
    }
    zmq_assert (false);
103 104 105 106
}

void zmq::xrep_t::xrevive (class reader_t *pipe_)
{
107 108 109 110 111 112 113 114 115
    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
          it++) {
        if (it->reader == pipe_) {
            zmq_assert (!it->active);
            it->active = true;
            return;
        }
    }
    zmq_assert (false);
116 117
}

Martin Hurton's avatar
Martin Hurton committed
118 119
void zmq::xrep_t::xrevive (class writer_t *pipe_)
{
120 121 122 123 124 125 126 127 128
    for (outpipes_t::iterator it = outpipes.begin ();
          it != outpipes.end (); ++it) {
        if (it->second.writer == pipe_) {
            zmq_assert (!it->second.active);
            it->second.active = true;
            return;
        }
    }
    zmq_assert (false);
Martin Hurton's avatar
Martin Hurton committed
129 130
}

131 132 133 134 135 136 137 138 139
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
{
    errno = EINVAL;
    return -1;
}

int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
    //  If this is the first part of the message it's the identity of the
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

        //  There's no such thing as prefix with no subsequent message.
        zmq_assert (msg_->flags & ZMQ_MSG_MORE);
        more_out = true;

        //  Find the pipe associated with the identity stored in the prefix.
        //  If there's no such pipe just silently drop the message.
        blob_t identity ((unsigned char*) zmq_msg_data (msg_),
            zmq_msg_size (msg_));
        outpipes_t::iterator it = outpipes.find (identity);
        if (it == outpipes.end ())
            return 0;
        
        //  Remember the outgoing pipe.
        current_out = it->second.writer;

160 161 162
        return 0;
    }

163 164
    //  Check whether this is the last part of the message.
    more_out = msg_->flags & ZMQ_MSG_MORE;
165

166 167 168 169 170 171 172 173 174 175 176 177 178
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
        bool ok = current_out->write (msg_);
        zmq_assert (ok);
        if (!more_out) {
            current_out->flush ();
            current_out = NULL;
        }
    }
    else {
        int rc = zmq_msg_close (msg_);
        zmq_assert (rc == 0);
    }
179 180 181 182 183 184

    //  Detach the message from the data buffer.
    int rc = zmq_msg_init (msg_);
    zmq_assert (rc == 0);

    return 0;
185 186 187 188
}

int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    //  Deallocate old content of the message.
    zmq_msg_close (msg_);

    //  If we are in the middle of reading a message, just grab next part of it.
    if (more_in) {
        zmq_assert (inpipes [current_in].active);
        bool fetched = inpipes [current_in].reader->read (msg_);
        zmq_assert (fetched);
        more_in = msg_->flags & ZMQ_MSG_MORE;
        if (!more_in) {
            current_in++;
            if (current_in >= inpipes.size ())
                current_in = 0;
        }
        return 0;
    }

    //  Round-robin over the pipes to get the next message.
    for (int count = inpipes.size (); count != 0; count--) {

        //  Try to fetch new message.
        bool fetched;
        if (!inpipes [current_in].active)
            fetched = false;
        else
            fetched = inpipes [current_in].reader->check_read ();

        //  If we have a message, create a prefix and return it to the caller.
        if (fetched) {
            int rc = zmq_msg_init_size (msg_,
                inpipes [current_in].identity.size ());
            zmq_assert (rc == 0);
            memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
                zmq_msg_size (msg_));
223
            msg_->flags = ZMQ_MSG_MORE;
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
            more_in = true;
            return 0;
        }

        //  If me don't have a message, move to next pipe.
        current_in++;
        if (current_in >= inpipes.size ())
            current_in = 0;
    }

    //  No message is available. Initialise the output parameter
    //  to be a 0-byte message.
    zmq_msg_init (msg_);
    errno = EAGAIN;
    return -1;
239 240 241 242
}

bool zmq::xrep_t::xhas_in ()
{
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
    //  There are subsequent parts of the partly-read message available.
    if (more_in)
        return true;

    //  Note that messing with current doesn't break the fairness of fair
    //  queueing algorithm. If there are no messages available current will
    //  get back to its original value. Otherwise it'll point to the first
    //  pipe holding messages, skipping only pipes with no messages available.
    for (int count = inpipes.size (); count != 0; count--) {
        if (inpipes [current_in].active &&
              inpipes [current_in].reader->check_read ())
            return true;
        current_in++;
        if (current_in >= inpipes.size ())
            current_in = 0;
    }

    return false;
261 262 263 264
}

bool zmq::xrep_t::xhas_out ()
{
265 266 267 268
    //  In theory, XREP socket is always ready for writing. Whether actual
    //  attempt to write succeeds depends on whitch pipe the message is going
    //  to be routed to.
    return true;
269 270 271
}