req.cpp 9.15 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33
#include "req.hpp"
#include "err.hpp"
34
#include "msg.hpp"
35 36 37
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
38

39
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40
    dealer_t (parent_, tid_, sid_),
41 42 43 44 45 46
    _receiving_reply (false),
    _message_begins (true),
    _reply_pipe (NULL),
    _request_id_frames_enabled (false),
    _request_id (generate_random ()),
    _strict (true)
47
{
48
    options.type = ZMQ_REQ;
49 50 51 52
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
53 54
}

55
int zmq::req_t::xsend (msg_t *msg_)
56 57
{
    //  If we've sent a request and we still haven't got the reply,
58
    //  we can't send another request unless the strict option is disabled.
59 60
    if (_receiving_reply) {
        if (_strict) {
61 62 63 64
            errno = EFSM;
            return -1;
        }

65 66
        _receiving_reply = false;
        _message_begins = true;
67 68
    }

69
    //  First part of the request is the request routing id.
70 71
    if (_message_begins) {
        _reply_pipe = NULL;
72

73 74
        if (_request_id_frames_enabled) {
            _request_id++;
75 76

            msg_t id;
77 78
            int rc = id.init_size (sizeof (uint32_t));
            memcpy (id.data (), &_request_id, sizeof (uint32_t));
79 80 81
            errno_assert (rc == 0);
            id.set_flags (msg_t::more);

82
            rc = dealer_t::sendpipe (&id, &_reply_pipe);
83
            if (rc != 0) {
84
                return -1;
85
            }
86 87
        }

88 89
        msg_t bottom;
        int rc = bottom.init ();
90
        errno_assert (rc == 0);
91
        bottom.set_flags (msg_t::more);
92

93
        rc = dealer_t::sendpipe (&bottom, &_reply_pipe);
94
        if (rc != 0)
95
            return -1;
96
        zmq_assert (_reply_pipe);
97

98
        _message_begins = false;
99

100
        // Eat all currently available messages before the request is fully
101 102 103 104 105 106 107 108 109 110 111 112 113
        // sent. This is done to avoid:
        //   REQ sends request to A, A replies, B replies too.
        //   A's reply was first and matches, that is used.
        //   An hour later REQ sends a request to B. B's old reply is used.
        msg_t drop;
        while (true) {
            rc = drop.init ();
            errno_assert (rc == 0);
            rc = dealer_t::xrecv (&drop);
            if (rc != 0)
                break;
            drop.close ();
        }
114 115
    }

116
    bool more = (msg_->flags () & msg_t::more) != 0;
117

118
    int rc = dealer_t::xsend (msg_);
119 120
    if (rc != 0)
        return rc;
121

122 123
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
124 125
        _receiving_reply = true;
        _message_begins = true;
126
    }
127 128 129 130

    return 0;
}

131
int zmq::req_t::xrecv (msg_t *msg_)
132 133
{
    //  If request wasn't send, we can't wait for reply.
134
    if (!_receiving_reply) {
135
        errno = EFSM;
136 137 138
        return -1;
    }

139
    //  Skip messages until one with the right first frames is found.
140
    while (_message_begins) {
141
        //  If enabled, the first frame must have the correct request_id.
142
        if (_request_id_frames_enabled) {
143 144 145 146
            int rc = recv_reply_pipe (msg_);
            if (rc != 0)
                return rc;

147
            if (unlikely (!(msg_->flags () & msg_t::more)
148
                          || msg_->size () != sizeof (_request_id)
149
                          || *static_cast<uint32_t *> (msg_->data ())
150
                               != _request_id)) {
151 152 153 154 155 156 157 158 159 160 161
                //  Skip the remaining frames and try the next message
                while (msg_->flags () & msg_t::more) {
                    rc = recv_reply_pipe (msg_);
                    errno_assert (rc == 0);
                }
                continue;
            }
        }

        //  The next frame must be 0.
        // TODO: Failing this check should also close the connection with the peer!
162
        int rc = recv_reply_pipe (msg_);
163 164
        if (rc != 0)
            return rc;
165

166
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
167 168 169
            //  Skip the remaining frames and try the next message
            while (msg_->flags () & msg_t::more) {
                rc = recv_reply_pipe (msg_);
170
                errno_assert (rc == 0);
171
            }
172
            continue;
173
        }
174

175
        _message_begins = false;
176
    }
177

178
    const int rc = recv_reply_pipe (msg_);
179 180 181 182
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
183
    if (!(msg_->flags () & msg_t::more)) {
184 185
        _receiving_reply = false;
        _message_begins = true;
186 187
    }

188 189 190
    return 0;
}

191 192
bool zmq::req_t::xhas_in ()
{
193 194
    //  TODO: Duplicates should be removed here.

195
    if (!_receiving_reply)
196
        return false;
197

198
    return dealer_t::xhas_in ();
199 200 201 202
}

bool zmq::req_t::xhas_out ()
{
203
    if (_receiving_reply && _strict)
204 205
        return false;

206
    return dealer_t::xhas_out ();
207 208
}

209 210 211
int zmq::req_t::xsetsockopt (int option_,
                             const void *optval_,
                             size_t optvallen_)
212
{
213
    const bool is_int = (optvallen_ == sizeof (int));
214
    int value = 0;
215 216
    if (is_int)
        memcpy (&value, optval_, sizeof (int));
217

218
    switch (option_) {
219
        case ZMQ_REQ_CORRELATE:
220
            if (is_int && value >= 0) {
221
                _request_id_frames_enabled = (value != 0);
222 223 224 225
                return 0;
            }
            break;

226
        case ZMQ_REQ_RELAXED:
227
            if (is_int && value >= 0) {
228
                _strict = (value == 0);
229 230 231 232
                return 0;
            }
            break;

233 234 235 236
        default:
            break;
    }

237 238 239 240 241
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}

void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
242 243
    if (_reply_pipe == pipe_)
        _reply_pipe = NULL;
244
    dealer_t::xpipe_terminated (pipe_);
245 246
}

247 248 249 250
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
    while (true) {
        pipe_t *pipe = NULL;
251
        const int rc = dealer_t::recvpipe (msg_, &pipe);
252 253
        if (rc != 0)
            return rc;
254
        if (!_reply_pipe || pipe == _reply_pipe)
255 256 257 258
            return 0;
    }
}

259 260 261 262 263
zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
                                   bool connect_,
                                   socket_base_t *socket_,
                                   const options_t &options_,
                                   address_t *addr_) :
264
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
265
    _state (bottom)
266 267 268 269 270 271
{
}

zmq::req_session_t::~req_session_t ()
{
}
272

273
int zmq::req_session_t::push_msg (msg_t *msg_)
274
{
275 276 277 278 279
    //  Ignore commands, they are processed by the engine and should not
    //  affect the state machine.
    if (unlikely (msg_->flags () & msg_t::command))
        return 0;

280
    switch (_state) {
281 282 283 284 285 286
        case bottom:
            if (msg_->flags () == msg_t::more) {
                //  In case option ZMQ_CORRELATE is on, allow request_id to be
                //  transfered as first frame (would be too cumbersome to check
                //  whether the option is actually on or not).
                if (msg_->size () == sizeof (uint32_t)) {
287
                    _state = request_id;
288
                    return session_base_t::push_msg (msg_);
289 290
                }
                if (msg_->size () == 0) {
291
                    _state = body;
292 293
                    return session_base_t::push_msg (msg_);
                }
294
            }
295 296 297
            break;
        case request_id:
            if (msg_->flags () == msg_t::more && msg_->size () == 0) {
298
                _state = body;
299 300
                return session_base_t::push_msg (msg_);
            }
301 302 303 304 305
            break;
        case body:
            if (msg_->flags () == msg_t::more)
                return session_base_t::push_msg (msg_);
            if (msg_->flags () == 0) {
306
                _state = bottom;
307 308 309
                return session_base_t::push_msg (msg_);
            }
            break;
310 311 312 313 314
    }
    errno = EFAULT;
    return -1;
}

Martin Hurton's avatar
Martin Hurton committed
315 316 317
void zmq::req_session_t::reset ()
{
    session_base_t::reset ();
318
    _state = bottom;
Martin Hurton's avatar
Martin Hurton committed
319
}