req.cpp 7.28 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "req.hpp"
#include "err.hpp"
22
#include "msg.hpp"
23 24 25
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
26

27
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
28
    dealer_t (parent_, tid_, sid_),
29
    receiving_reply (false),
30
    message_begins (true),
31 32
    reply_pipe (NULL),
    request_id_frames_enabled (false),
33
    request_id (generate_random()),
34
    strict (true)
35
{
36
    options.type = ZMQ_REQ;
37 38 39 40
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
41 42
}

43
int zmq::req_t::xsend (msg_t *msg_)
44 45
{
    //  If we've sent a request and we still haven't got the reply,
46
    //  we can't send another request unless the strict option is disabled.
47
    if (receiving_reply) {
48
        if (strict) {
49 50 51 52 53 54 55 56
            errno = EFSM;
            return -1;
        }

        if (reply_pipe)
            reply_pipe->terminate (false);
        receiving_reply = false;
        message_begins = true;
57 58
    }

59
    //  First part of the request is the request identity.
60
    if (message_begins) {
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
        reply_pipe = NULL;

        if (request_id_frames_enabled) {
            request_id++;

            msg_t id;
            int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
            errno_assert (rc == 0);
            id.set_flags (msg_t::more);

            rc = dealer_t::sendpipe (&id, &reply_pipe);
            if (rc != 0)
                return -1;
        }

76 77
        msg_t bottom;
        int rc = bottom.init ();
78
        errno_assert (rc == 0);
79
        bottom.set_flags (msg_t::more);
80 81

        rc = dealer_t::sendpipe (&bottom, &reply_pipe);
82
        if (rc != 0)
83
            return -1;
84
        zmq_assert (reply_pipe);
85

86
        message_begins = false;
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101

        // Eat all currently avaliable messages before the request is fully
        // sent. This is done to avoid:
        //   REQ sends request to A, A replies, B replies too.
        //   A's reply was first and matches, that is used.
        //   An hour later REQ sends a request to B. B's old reply is used.
        msg_t drop;
        while (true) {
            rc = drop.init ();
            errno_assert (rc == 0);
            rc = dealer_t::xrecv (&drop);
            if (rc != 0)
                break;
            drop.close ();
        }
102 103
    }

104
    bool more = msg_->flags () & msg_t::more ? true : false;
105

106
    int rc = dealer_t::xsend (msg_);
107 108
    if (rc != 0)
        return rc;
109

110 111 112 113
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
        receiving_reply = true;
        message_begins = true;
114
    }
115 116 117 118

    return 0;
}

119
int zmq::req_t::xrecv (msg_t *msg_)
120 121
{
    //  If request wasn't send, we can't wait for reply.
122
    if (!receiving_reply) {
123
        errno = EFSM;
124 125 126
        return -1;
    }

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    //  Skip messages until one with the right first frames is found.
    while (message_begins) {
        //  If enabled, the first frame must have the correct request_id.
        if (request_id_frames_enabled) {
            int rc = recv_reply_pipe (msg_);
            if (rc != 0)
                return rc;

            if (unlikely (!(msg_->flags () & msg_t::more) ||
                          msg_->size () != sizeof (request_id) ||
                          *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
                //  Skip the remaining frames and try the next message
                while (msg_->flags () & msg_t::more) {
                    rc = recv_reply_pipe (msg_);
                    errno_assert (rc == 0);
                }
                continue;
            }
        }

        //  The next frame must be 0.
        // TODO: Failing this check should also close the connection with the peer!
149
        int rc = recv_reply_pipe (msg_);
150 151
        if (rc != 0)
            return rc;
152

153
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
154 155 156
            //  Skip the remaining frames and try the next message
            while (msg_->flags () & msg_t::more) {
                rc = recv_reply_pipe (msg_);
157
                errno_assert (rc == 0);
158
            }
159
            continue;
160
        }
161

162
        message_begins = false;
163
    }
164

165
    int rc = recv_reply_pipe (msg_);
166 167 168 169
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
170
    if (!(msg_->flags () & msg_t::more)) {
171
        receiving_reply = false;
172
        message_begins = true;
173 174
    }

175 176 177
    return 0;
}

178 179
bool zmq::req_t::xhas_in ()
{
180 181
    //  TODO: Duplicates should be removed here.

182
    if (!receiving_reply)
183
        return false;
184

185
    return dealer_t::xhas_in ();
186 187 188 189
}

bool zmq::req_t::xhas_out ()
{
190
    if (receiving_reply)
191 192
        return false;

193
    return dealer_t::xhas_out ();
194 195
}

196
int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
197 198 199 200
{
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;
    switch (option_) {
201
        case ZMQ_REQ_CORRELATE:
202
            if (is_int && value >= 0) {
203
                request_id_frames_enabled = (value != 0);
204 205 206 207
                return 0;
            }
            break;

208
        case ZMQ_REQ_RELAXED:
209
            if (is_int && value >= 0) {
210
                strict = (value == 0);
211 212 213 214
                return 0;
            }
            break;

215 216 217 218
        default:
            break;
    }

219 220 221 222 223 224 225 226
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}

void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
    if (reply_pipe == pipe_)
        reply_pipe = NULL;
    dealer_t::xpipe_terminated (pipe_);
227 228
}

229 230 231 232
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
    while (true) {
        pipe_t *pipe = NULL;
233
        int rc = dealer_t::recvpipe (msg_, &pipe);
234 235
        if (rc != 0)
            return rc;
236
        if (!reply_pipe || pipe == reply_pipe)
237 238 239 240
            return 0;
    }
}

241 242
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
      socket_base_t *socket_, const options_t &options_,
243
      address_t *addr_) :
244
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
245
    state (bottom)
246 247 248 249 250 251
{
}

zmq::req_session_t::~req_session_t ()
{
}
252

253
int zmq::req_session_t::push_msg (msg_t *msg_)
254
{
255 256
    switch (state) {
    case bottom:
257
        if (msg_->flags () == msg_t::more && msg_->size () == 0) {
258
            state = body;
259
            return session_base_t::push_msg (msg_);
260
        }
261 262
        break;
    case body:
263
        if (msg_->flags () == msg_t::more)
264
            return session_base_t::push_msg (msg_);
265
        if (msg_->flags () == 0) {
266
            state = bottom;
267
            return session_base_t::push_msg (msg_);
268
        }
269
        break;
270 271 272 273 274
    }
    errno = EFAULT;
    return -1;
}

Martin Hurton's avatar
Martin Hurton committed
275 276 277
void zmq::req_session_t::reset ()
{
    session_base_t::reset ();
278
    state = bottom;
Martin Hurton's avatar
Martin Hurton committed
279
}