req.cpp 3.89 KB
Newer Older
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
16

17
    You should have received a copy of the GNU Lesser General Public License
18 19 20 21 22
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "req.hpp"
#include "err.hpp"
23
#include "msg.hpp"
24 25 26
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
27

Martin Sustrik's avatar
Martin Sustrik committed
28 29
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
    xreq_t (parent_, tid_),
30
    receiving_reply (false),
31
    message_begins (true),
32
    request_id (generate_random ())
33
{
34
    options.type = ZMQ_REQ;
35 36 37 38
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
39 40
}

41
int zmq::req_t::xsend (msg_t *msg_, int flags_)
42 43 44
{
    //  If we've sent a request and we still haven't got the reply,
    //  we can't send another request.
45
    if (receiving_reply) {
46
        errno = EFSM;
47 48 49
        return -1;
    }

50
    //  First part of the request is the request identity.
51
    if (message_begins) {
52
        msg_t prefix;
53
        int rc = prefix.init_size (4);
54
        errno_assert (rc == 0);
55
        prefix.set_flags (msg_t::label);
56 57
        unsigned char *data = (unsigned char*) prefix.data ();
        put_uint32 (data, request_id);
58 59 60 61
        rc = xreq_t::xsend (&prefix, flags_);
        if (rc != 0)
            return rc;
        message_begins = false;
62 63
    }

Martin Sustrik's avatar
Martin Sustrik committed
64
    bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
65

66 67 68
    int rc = xreq_t::xsend (msg_, flags_);
    if (rc != 0)
        return rc;
69

70 71 72 73
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
        receiving_reply = true;
        message_begins = true;
74
    }
75 76 77 78

    return 0;
}

79
int zmq::req_t::xrecv (msg_t *msg_, int flags_)
80 81
{
    //  If request wasn't send, we can't wait for reply.
82
    if (!receiving_reply) {
83
        errno = EFSM;
84 85 86
        return -1;
    }

87
    //  First part of the reply should be the original request ID.
88 89 90 91
    if (message_begins) {
        int rc = xreq_t::xrecv (msg_, flags_);
        if (rc != 0)
            return rc;
92 93 94 95 96 97 98

        // TODO: This should also close the connection with the peer!
        if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
            errno = EAGAIN;
            return -1;
        }
        
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
        unsigned char *data = (unsigned char*) msg_->data ();
        if (unlikely (get_uint32 (data) != request_id)) {

            //  The request ID does not match. Drop the entire message.
            while (true) {
                int rc = xreq_t::xrecv (msg_, flags_);
                errno_assert (rc == 0);
                if (!(msg_->flags () & (msg_t::label | msg_t::more)))
                    break;
            }
            msg_->close ();
            msg_->init ();
            errno = EAGAIN;
            return -1;
        }
114
        message_begins = false;
115
    }
116

117 118 119 120 121
    int rc = xreq_t::xrecv (msg_, flags_);
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
122
    if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
123
        request_id++;
124
        receiving_reply = false;
125
        message_begins = true;
126 127
    }

128 129 130
    return 0;
}

131 132
bool zmq::req_t::xhas_in ()
{
133 134
    //  TODO: Duplicates should be removed here.

135
    if (!receiving_reply)
136
        return false;
137

138
    return xreq_t::xhas_in ();
139 140 141 142
}

bool zmq::req_t::xhas_out ()
{
143
    if (receiving_reply)
144 145
        return false;

146
    return xreq_t::xhas_out ();
147 148
}

149