xrep.cpp 6.88 KB
Newer Older
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
16

17
    You should have received a copy of the GNU Lesser General Public License
18 19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "xrep.hpp"
22
#include "pipe.hpp"
23 24
#include "wire.hpp"
#include "random.hpp"
25
#include "err.hpp"
26

Martin Sustrik's avatar
Martin Sustrik committed
27 28
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
    socket_base_t (parent_, tid_),
29
    prefetched (false),
30 31
    more_in (false),
    current_out (NULL),
32 33
    more_out (false),
    next_peer_id (generate_random ())
34
{
35
    options.type = ZMQ_XREP;
36

37 38 39 40
    //  If peer disconnect there's noone to send reply to anyway. We can drop
    //  all the outstanding requests from that peer.
    options.delay_on_disconnect = false;

41
    prefetched_msg.init ();
42 43 44 45
}

zmq::xrep_t::~xrep_t ()
{
46
    zmq_assert (outpipes.empty ());
47
    prefetched_msg.close ();
48 49
}

50
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
51
{
52 53
    zmq_assert (pipe_);

54 55 56 57 58 59 60 61 62 63 64 65 66
    //  Generate a new peer ID. Take care to avoid duplicates.
    outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
    if (!outpipes.empty ()) {
        while (true) {
            if (it == outpipes.end ())
                it = outpipes.begin ();
            if (it->first != next_peer_id)
                break;
            ++next_peer_id;
            ++it;
        }
    }

67 68 69
    //  Add the pipe to the map out outbound pipes.
    outpipe_t outpipe = {pipe_, true};
    bool ok = outpipes.insert (outpipes_t::value_type (
70
        next_peer_id, outpipe)).second;
71 72 73
    zmq_assert (ok);

    //  Add the pipe to the list of inbound pipes.
74 75
    pipe_->set_pipe_id (next_peer_id);
    fq.attach (pipe_);
76 77 78 79 80

    //  Advance next peer ID so that if new connection is dropped shortly after
    //  its creation we don't accidentally get two subsequent peers with
    //  the same ID.
    ++next_peer_id;
81 82
}

83
void zmq::xrep_t::xterminated (pipe_t *pipe_)
84
{
85
    fq.terminated (pipe_);
86

87
    for (outpipes_t::iterator it = outpipes.begin ();
88
          it != outpipes.end (); ++it) {
89
        if (it->second.pipe == pipe_) {
90
            outpipes.erase (it);
91 92
            if (pipe_ == current_out)
                current_out = NULL;
93 94
            return;
        }
95
    }
96 97 98
    zmq_assert (false);
}

99
void zmq::xrep_t::xread_activated (pipe_t *pipe_)
100
{
101
    fq.activated (pipe_);
102 103
}

104
void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
105
{
106 107
    for (outpipes_t::iterator it = outpipes.begin ();
          it != outpipes.end (); ++it) {
108
        if (it->second.pipe == pipe_) {
109 110 111 112 113 114
            zmq_assert (!it->second.active);
            it->second.active = true;
            return;
        }
    }
    zmq_assert (false);
Martin Hurton's avatar
Martin Hurton committed
115 116
}

117
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
118
{
119
    //  If this is the first part of the message it's the ID of the
120 121 122 123
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

124
        //  If we have malformed message (prefix with no subsequent message)
125
        //  then just silently ignore it.
126 127
        //  TODO: The connections should be killed instead.
        if (msg_->flags () & msg_t::label) {
128 129 130

            more_out = true;

131
            //  Find the pipe associated with the peer ID stored in the prefix.
132
            //  If there's no such pipe just silently ignore the message.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
            if (msg_->size () == 4) {
                uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
                outpipes_t::iterator it = outpipes.find (peer_id);

                if (it != outpipes.end ()) {
                    current_out = it->second.pipe;
                    msg_t empty;
                    int rc = empty.init ();
                    errno_assert (rc == 0);
                    if (!current_out->check_write (&empty)) {
                        it->second.active = false;
                        more_out = false;
                        current_out = NULL;
                    }
                    rc = empty.close ();
                    errno_assert (rc == 0);
149 150
                }
            }
151
        }
152

153 154 155 156
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
157 158 159
        return 0;
    }

160
    //  Check whether this is the last part of the message.
Martin Sustrik's avatar
Martin Sustrik committed
161
    more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
162

163 164 165 166 167 168 169 170 171 172
    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {
        bool ok = current_out->write (msg_);
        zmq_assert (ok);
        if (!more_out) {
            current_out->flush ();
            current_out = NULL;
        }
    }
    else {
173 174
        int rc = msg_->close ();
        errno_assert (rc == 0);
175
    }
176 177

    //  Detach the message from the data buffer.
178 179
    int rc = msg_->init ();
    errno_assert (rc == 0);
180 181

    return 0;
182 183
}

184
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
185
{
186
    //  If there is a prefetched message, return it.
187
    if (prefetched) {
188 189
        int rc = msg_->move (prefetched_msg);
        errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
190
        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
191 192 193 194
        prefetched = false;
        return 0;
    }

195 196 197 198 199
    //  Get next message part.
    pipe_t *pipe;
    int rc = fq.recvpipe (msg_, flags_, &pipe);
    if (rc != 0)
        return -1;
200

201
    //  If we are in the middle of reading a message, just return the next part.
202
    if (more_in) {
Martin Sustrik's avatar
Martin Sustrik committed
203
        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
204 205
        return 0;
    }
206 207 208 209 210 211 212
 
    //  We are at the beginning of a new message. Move the message part we
    //  have to the prefetched and return the ID of the peer instead.
    rc = prefetched_msg.move (*msg_);
    errno_assert (rc == 0);
    prefetched = true;
    rc = msg_->close ();
213
    errno_assert (rc == 0);
214 215 216 217 218
    rc = msg_->init_size (4);
    errno_assert (rc == 0);
    put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
    msg_->set_flags (msg_t::label);
    return 0;
219 220
}

221 222 223 224 225 226 227 228 229 230
int zmq::xrep_t::rollback (void)
{
    if (current_out) {
        current_out->rollback ();
        current_out = NULL;
        more_out = false;
    }
    return 0;
}

231 232
bool zmq::xrep_t::xhas_in ()
{
233
    if (prefetched)
234
        return true;
235
    return fq.has_in ();
236 237 238 239
}

bool zmq::xrep_t::xhas_out ()
{
240 241 242 243
    //  In theory, XREP socket is always ready for writing. Whether actual
    //  attempt to write succeeds depends on whitch pipe the message is going
    //  to be routed to.
    return true;
244 245 246
}


247