req.cpp 9.47 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33
#include "req.hpp"
#include "err.hpp"
34
#include "msg.hpp"
35 36 37
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
38

39
extern "C" {
40
static void free_id (void *data_, void *hint_)
41
{
42 43
    LIBZMQ_UNUSED (hint_);
    free (data_);
44
}
45 46
}

47
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
48
    dealer_t (parent_, tid_, sid_),
49 50 51 52 53 54
    _receiving_reply (false),
    _message_begins (true),
    _reply_pipe (NULL),
    _request_id_frames_enabled (false),
    _request_id (generate_random ()),
    _strict (true)
55
{
56
    options.type = ZMQ_REQ;
57 58 59 60
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
61 62
}

63
int zmq::req_t::xsend (msg_t *msg_)
64 65
{
    //  If we've sent a request and we still haven't got the reply,
66
    //  we can't send another request unless the strict option is disabled.
67 68
    if (_receiving_reply) {
        if (_strict) {
69 70 71 72
            errno = EFSM;
            return -1;
        }

73 74
        _receiving_reply = false;
        _message_begins = true;
75 76
    }

77
    //  First part of the request is the request routing id.
78 79
    if (_message_begins) {
        _reply_pipe = NULL;
80

81 82
        if (_request_id_frames_enabled) {
            _request_id++;
83

84
            //  Copy request id before sending (see issue #1695 for details).
85 86
            uint32_t *request_id_copy =
              static_cast<uint32_t *> (malloc (sizeof (uint32_t)));
87 88
            zmq_assert (request_id_copy);

89
            *request_id_copy = _request_id;
90

91
            msg_t id;
92 93
            int rc =
              id.init_data (request_id_copy, sizeof (uint32_t), free_id, NULL);
94 95 96
            errno_assert (rc == 0);
            id.set_flags (msg_t::more);

97
            rc = dealer_t::sendpipe (&id, &_reply_pipe);
98 99 100 101
            if (rc != 0)
                return -1;
        }

102 103
        msg_t bottom;
        int rc = bottom.init ();
104
        errno_assert (rc == 0);
105
        bottom.set_flags (msg_t::more);
106

107
        rc = dealer_t::sendpipe (&bottom, &_reply_pipe);
108
        if (rc != 0)
109
            return -1;
110
        zmq_assert (_reply_pipe);
111

112
        _message_begins = false;
113

114
        // Eat all currently available messages before the request is fully
115 116 117 118 119 120 121 122 123 124 125 126 127
        // sent. This is done to avoid:
        //   REQ sends request to A, A replies, B replies too.
        //   A's reply was first and matches, that is used.
        //   An hour later REQ sends a request to B. B's old reply is used.
        msg_t drop;
        while (true) {
            rc = drop.init ();
            errno_assert (rc == 0);
            rc = dealer_t::xrecv (&drop);
            if (rc != 0)
                break;
            drop.close ();
        }
128 129
    }

130
    bool more = (msg_->flags () & msg_t::more) != 0;
131

132
    int rc = dealer_t::xsend (msg_);
133 134
    if (rc != 0)
        return rc;
135

136 137
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
138 139
        _receiving_reply = true;
        _message_begins = true;
140
    }
141 142 143 144

    return 0;
}

145
int zmq::req_t::xrecv (msg_t *msg_)
146 147
{
    //  If request wasn't send, we can't wait for reply.
148
    if (!_receiving_reply) {
149
        errno = EFSM;
150 151 152
        return -1;
    }

153
    //  Skip messages until one with the right first frames is found.
154
    while (_message_begins) {
155
        //  If enabled, the first frame must have the correct request_id.
156
        if (_request_id_frames_enabled) {
157 158 159 160
            int rc = recv_reply_pipe (msg_);
            if (rc != 0)
                return rc;

161
            if (unlikely (!(msg_->flags () & msg_t::more)
162
                          || msg_->size () != sizeof (_request_id)
163
                          || *static_cast<uint32_t *> (msg_->data ())
164
                               != _request_id)) {
165 166 167 168 169 170 171 172 173 174 175
                //  Skip the remaining frames and try the next message
                while (msg_->flags () & msg_t::more) {
                    rc = recv_reply_pipe (msg_);
                    errno_assert (rc == 0);
                }
                continue;
            }
        }

        //  The next frame must be 0.
        // TODO: Failing this check should also close the connection with the peer!
176
        int rc = recv_reply_pipe (msg_);
177 178
        if (rc != 0)
            return rc;
179

180
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
181 182 183
            //  Skip the remaining frames and try the next message
            while (msg_->flags () & msg_t::more) {
                rc = recv_reply_pipe (msg_);
184
                errno_assert (rc == 0);
185
            }
186
            continue;
187
        }
188

189
        _message_begins = false;
190
    }
191

192
    int rc = recv_reply_pipe (msg_);
193 194 195 196
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
197
    if (!(msg_->flags () & msg_t::more)) {
198 199
        _receiving_reply = false;
        _message_begins = true;
200 201
    }

202 203 204
    return 0;
}

205 206
bool zmq::req_t::xhas_in ()
{
207 208
    //  TODO: Duplicates should be removed here.

209
    if (!_receiving_reply)
210
        return false;
211

212
    return dealer_t::xhas_in ();
213 214 215 216
}

bool zmq::req_t::xhas_out ()
{
217
    if (_receiving_reply && _strict)
218 219
        return false;

220
    return dealer_t::xhas_out ();
221 222
}

223 224 225
int zmq::req_t::xsetsockopt (int option_,
                             const void *optval_,
                             size_t optvallen_)
226 227
{
    bool is_int = (optvallen_ == sizeof (int));
228
    int value = 0;
229 230
    if (is_int)
        memcpy (&value, optval_, sizeof (int));
231

232
    switch (option_) {
233
        case ZMQ_REQ_CORRELATE:
234
            if (is_int && value >= 0) {
235
                _request_id_frames_enabled = (value != 0);
236 237 238 239
                return 0;
            }
            break;

240
        case ZMQ_REQ_RELAXED:
241
            if (is_int && value >= 0) {
242
                _strict = (value == 0);
243 244 245 246
                return 0;
            }
            break;

247 248 249 250
        default:
            break;
    }

251 252 253 254 255
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}

void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
256 257
    if (_reply_pipe == pipe_)
        _reply_pipe = NULL;
258
    dealer_t::xpipe_terminated (pipe_);
259 260
}

261 262 263 264
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
    while (true) {
        pipe_t *pipe = NULL;
265
        int rc = dealer_t::recvpipe (msg_, &pipe);
266 267
        if (rc != 0)
            return rc;
268
        if (!_reply_pipe || pipe == _reply_pipe)
269 270 271 272
            return 0;
    }
}

273 274 275 276 277
zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
                                   bool connect_,
                                   socket_base_t *socket_,
                                   const options_t &options_,
                                   address_t *addr_) :
278
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
279
    _state (bottom)
280 281 282 283 284 285
{
}

zmq::req_session_t::~req_session_t ()
{
}
286

287
int zmq::req_session_t::push_msg (msg_t *msg_)
288
{
289 290 291 292 293
    //  Ignore commands, they are processed by the engine and should not
    //  affect the state machine.
    if (unlikely (msg_->flags () & msg_t::command))
        return 0;

294
    switch (_state) {
295 296 297 298 299 300
        case bottom:
            if (msg_->flags () == msg_t::more) {
                //  In case option ZMQ_CORRELATE is on, allow request_id to be
                //  transfered as first frame (would be too cumbersome to check
                //  whether the option is actually on or not).
                if (msg_->size () == sizeof (uint32_t)) {
301
                    _state = request_id;
302
                    return session_base_t::push_msg (msg_);
303 304
                }
                if (msg_->size () == 0) {
305
                    _state = body;
306 307
                    return session_base_t::push_msg (msg_);
                }
308
            }
309 310 311
            break;
        case request_id:
            if (msg_->flags () == msg_t::more && msg_->size () == 0) {
312
                _state = body;
313 314
                return session_base_t::push_msg (msg_);
            }
315 316 317 318 319
            break;
        case body:
            if (msg_->flags () == msg_t::more)
                return session_base_t::push_msg (msg_);
            if (msg_->flags () == 0) {
320
                _state = bottom;
321 322 323
                return session_base_t::push_msg (msg_);
            }
            break;
324 325 326 327 328
    }
    errno = EFAULT;
    return -1;
}

Martin Hurton's avatar
Martin Hurton committed
329 330 331
void zmq::req_session_t::reset ()
{
    session_base_t::reset ();
332
    _state = bottom;
Martin Hurton's avatar
Martin Hurton committed
333
}