stream.cpp 7.88 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "stream.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"

zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
    prefetched (false),
    identity_sent (false),
    current_out (NULL),
    more_out (false),
33
    next_rid (generate_random ())
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
{
    options.type = ZMQ_STREAM;
    options.raw_sock = true;

    prefetched_id.init ();
    prefetched_msg.init ();
}

zmq::stream_t::~stream_t ()
{
    zmq_assert (outpipes.empty ());
    prefetched_id.close ();
    prefetched_msg.close ();
}

49
void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
50
{
51 52
    // subscribe_to_all_ is unused
    (void)subscribe_to_all_;
53 54 55

    zmq_assert (pipe_);

56 57
    identify_peer (pipe_);
    fq.attach (pipe_);
58 59 60 61
}

void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{
62 63 64 65 66 67
    outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
    zmq_assert (it != outpipes.end ());
    outpipes.erase (it);
    fq.pipe_terminated (pipe_);
    if (pipe_ == current_out)
        current_out = NULL;
68 69 70 71
}

void zmq::stream_t::xread_activated (pipe_t *pipe_)
{
72
    fq.activated (pipe_);
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
}

void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
{
    outpipes_t::iterator it;
    for (it = outpipes.begin (); it != outpipes.end (); ++it)
        if (it->second.pipe == pipe_)
            break;

    zmq_assert (it != outpipes.end ());
    zmq_assert (!it->second.active);
    it->second.active = true;
}

int zmq::stream_t::xsend (msg_t *msg_)
{
    //  If this is the first part of the message it's the ID of the
    //  peer to send the message to.
    if (!more_out) {
        zmq_assert (!current_out);

        //  If we have malformed message (prefix with no subsequent message)
        //  then just silently ignore it.
        //  TODO: The connections should be killed instead.
        if (msg_->flags () & msg_t::more) {

            //  Find the pipe associated with the identity stored in the prefix.
            //  If there's no such pipe return an error
            blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
            outpipes_t::iterator it = outpipes.find (identity);

            if (it != outpipes.end ()) {
                current_out = it->second.pipe;
                if (!current_out->check_write ()) {
                    it->second.active = false;
                    current_out = NULL;
                    errno = EAGAIN;
                    return -1;
                }
            }
            else {
                errno = EHOSTUNREACH;
                return -1;
            }
        }

119 120 121
        //  Expect one more message frame.
        more_out = true;

122 123 124 125 126 127 128
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
        return 0;
    }

129
    //  Ignore the MORE flag
130 131
    msg_->reset_flags (msg_t::more);

132 133
    //  This is the last part of the message.
    more_out = false;
134 135 136 137 138 139 140

    //  Push the message into the pipe. If there's no out pipe, just drop it.
    if (current_out) {

        // Close the remote connection if user has asked to do so
        // by sending zero length message.
        // Pending messages in the pipe will be dropped (on receiving term- ack)
141
        if (msg_->size () == 0) {
142 143 144
            current_out->terminate (false);
            int rc = msg_->close ();
            errno_assert (rc == 0);
145 146
            rc = msg_->init ();
            errno_assert (rc == 0);
147 148 149 150
            current_out = NULL;
            return 0;
        }
        bool ok = current_out->write (msg_);
151
        if (likely (ok))
152
            current_out->flush ();
153
        current_out = NULL;
154 155 156 157 158 159 160 161 162 163 164 165
    }
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
    }

    //  Detach the message from the data buffer.
    int rc = msg_->init ();
    errno_assert (rc == 0);

    return 0;
}
166

167 168 169 170
int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
{
    switch (option_) {
171 172 173
        case ZMQ_CONNECT_RID:
            if (optval_ && optvallen_) {
                connect_rid.assign ((char*) optval_, optvallen_);
174 175 176 177 178 179 180 181 182
                return 0;
            }
            break;
        default:
            break;
    }
    errno = EINVAL;
    return -1;
}
183

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
int zmq::stream_t::xrecv (msg_t *msg_)
{
    if (prefetched) {
        if (!identity_sent) {
            int rc = msg_->move (prefetched_id);
            errno_assert (rc == 0);
            identity_sent = true;
        }
        else {
            int rc = msg_->move (prefetched_msg);
            errno_assert (rc == 0);
            prefetched = false;
        }
        return 0;
    }

    pipe_t *pipe = NULL;
201
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
202 203 204 205
    if (rc != 0)
        return -1;

    zmq_assert (pipe != NULL);
206
    zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
207

208 209 210 211 212 213 214 215
    //  We have received a frame with TCP data.
    //  Rather than sendig this frame, we keep it in prefetched
    //  buffer and send a frame with peer's ID.
    blob_t identity = pipe->get_identity ();
    rc = msg_->init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (msg_->data (), identity.data (), identity.size ());
    msg_->set_flags (msg_t::more);
216

217 218
    prefetched = true;
    identity_sent = true;
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236

    return 0;
}

bool zmq::stream_t::xhas_in ()
{
    //  We may already have a message pre-fetched.
    if (prefetched)
        return true;

    //  Try to read the next message.
    //  The message, if read, is kept in the pre-fetch buffer.
    pipe_t *pipe = NULL;
    int rc = fq.recvpipe (&prefetched_msg, &pipe);
    if (rc != 0)
        return false;

    zmq_assert (pipe != NULL);
237
    zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252

    blob_t identity = pipe->get_identity ();
    rc = prefetched_id.init_size (identity.size ());
    errno_assert (rc == 0);
    memcpy (prefetched_id.data (), identity.data (), identity.size ());
    prefetched_id.set_flags (msg_t::more);

    prefetched = true;
    identity_sent = false;

    return true;
}

bool zmq::stream_t::xhas_out ()
{
253
    //  In theory, STREAM socket is always ready for writing. Whether actual
254 255 256 257 258
    //  attempt to write succeeds depends on which pipe the message is going
    //  to be routed to.
    return true;
}

259
void zmq::stream_t::identify_peer (pipe_t *pipe_)
260 261 262 263
{
    //  Always assign identity for raw-socket
    unsigned char buffer [5];
    buffer [0] = 0;
264
    blob_t identity;
265 266 267 268
    if (connect_rid.length ()) {
        identity = blob_t ((unsigned char*) connect_rid.c_str(),
            connect_rid.length ());
        connect_rid.clear ();
269 270
        outpipes_t::iterator it = outpipes.find (identity);
        if (it != outpipes.end ()) 
271
            zmq_assert(false);
272
    }
273
    else {
274
        put_uint32 (buffer + 1, next_rid++);
275
        identity = blob_t (buffer, sizeof buffer);
276 277 278
        memcpy (options.identity, identity.data (), identity.size ());
        options.identity_size = identity.size ();
    }
279 280 281
    pipe_->set_identity (identity);
    //  Add the record into output pipes lookup table
    outpipe_t outpipe = {pipe_, true};
282 283
    const bool ok = outpipes.insert (
        outpipes_t::value_type (identity, outpipe)).second;
284 285
    zmq_assert (ok);
}