req.cpp 8.83 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include "req.hpp"
#include "err.hpp"
33
#include "msg.hpp"
34 35 36
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
37

38 39 40 41 42 43 44 45
extern "C"
{
    static void free_id (void *data, void *hint)
    {
        free (data);
    }
}

46
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
47
    dealer_t (parent_, tid_, sid_),
48
    receiving_reply (false),
49
    message_begins (true),
50 51
    reply_pipe (NULL),
    request_id_frames_enabled (false),
52
    request_id (generate_random ()),
53
    strict (true)
54
{
55
    options.type = ZMQ_REQ;
56 57 58 59
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
60 61
}

62
int zmq::req_t::xsend (msg_t *msg_)
63 64
{
    //  If we've sent a request and we still haven't got the reply,
65
    //  we can't send another request unless the strict option is disabled.
66
    if (receiving_reply) {
67
        if (strict) {
68 69 70 71 72 73
            errno = EFSM;
            return -1;
        }

        receiving_reply = false;
        message_begins = true;
74 75
    }

76
    //  First part of the request is the request identity.
77
    if (message_begins) {
78 79 80 81 82
        reply_pipe = NULL;

        if (request_id_frames_enabled) {
            request_id++;

83 84 85 86
            //  Copy request id before sending (see issue #1695 for details).
            uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
            *request_id_copy = request_id;

87
            msg_t id;
88 89
            int rc = id.init_data (request_id_copy, sizeof (uint32_t),
                free_id, NULL);
90 91 92 93 94 95 96 97
            errno_assert (rc == 0);
            id.set_flags (msg_t::more);

            rc = dealer_t::sendpipe (&id, &reply_pipe);
            if (rc != 0)
                return -1;
        }

98 99
        msg_t bottom;
        int rc = bottom.init ();
100
        errno_assert (rc == 0);
101
        bottom.set_flags (msg_t::more);
102 103

        rc = dealer_t::sendpipe (&bottom, &reply_pipe);
104
        if (rc != 0)
105
            return -1;
106
        zmq_assert (reply_pipe);
107

108
        message_begins = false;
109

110
        // Eat all currently available messages before the request is fully
111 112 113 114 115 116 117 118 119 120 121 122 123
        // sent. This is done to avoid:
        //   REQ sends request to A, A replies, B replies too.
        //   A's reply was first and matches, that is used.
        //   An hour later REQ sends a request to B. B's old reply is used.
        msg_t drop;
        while (true) {
            rc = drop.init ();
            errno_assert (rc == 0);
            rc = dealer_t::xrecv (&drop);
            if (rc != 0)
                break;
            drop.close ();
        }
124 125
    }

126
    bool more = msg_->flags () & msg_t::more ? true : false;
127

128
    int rc = dealer_t::xsend (msg_);
129 130
    if (rc != 0)
        return rc;
131

132 133 134 135
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
        receiving_reply = true;
        message_begins = true;
136
    }
137 138 139 140

    return 0;
}

141
int zmq::req_t::xrecv (msg_t *msg_)
142 143
{
    //  If request wasn't send, we can't wait for reply.
144
    if (!receiving_reply) {
145
        errno = EFSM;
146 147 148
        return -1;
    }

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    //  Skip messages until one with the right first frames is found.
    while (message_begins) {
        //  If enabled, the first frame must have the correct request_id.
        if (request_id_frames_enabled) {
            int rc = recv_reply_pipe (msg_);
            if (rc != 0)
                return rc;

            if (unlikely (!(msg_->flags () & msg_t::more) ||
                          msg_->size () != sizeof (request_id) ||
                          *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
                //  Skip the remaining frames and try the next message
                while (msg_->flags () & msg_t::more) {
                    rc = recv_reply_pipe (msg_);
                    errno_assert (rc == 0);
                }
                continue;
            }
        }

        //  The next frame must be 0.
        // TODO: Failing this check should also close the connection with the peer!
171
        int rc = recv_reply_pipe (msg_);
172 173
        if (rc != 0)
            return rc;
174

175
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
176 177 178
            //  Skip the remaining frames and try the next message
            while (msg_->flags () & msg_t::more) {
                rc = recv_reply_pipe (msg_);
179
                errno_assert (rc == 0);
180
            }
181
            continue;
182
        }
183

184
        message_begins = false;
185
    }
186

187
    int rc = recv_reply_pipe (msg_);
188 189 190 191
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
192
    if (!(msg_->flags () & msg_t::more)) {
193
        receiving_reply = false;
194
        message_begins = true;
195 196
    }

197 198 199
    return 0;
}

200 201
bool zmq::req_t::xhas_in ()
{
202 203
    //  TODO: Duplicates should be removed here.

204
    if (!receiving_reply)
205
        return false;
206

207
    return dealer_t::xhas_in ();
208 209 210 211
}

bool zmq::req_t::xhas_out ()
{
212
    if (receiving_reply)
213 214
        return false;

215
    return dealer_t::xhas_out ();
216 217
}

218
int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
219 220
{
    bool is_int = (optvallen_ == sizeof (int));
221
    int value = 0;
222 223
    if (is_int)
        memcpy (&value, optval_, sizeof (int));
224

225
    switch (option_) {
226
        case ZMQ_REQ_CORRELATE:
227
            if (is_int && value >= 0) {
228
                request_id_frames_enabled = (value != 0);
229 230 231 232
                return 0;
            }
            break;

233
        case ZMQ_REQ_RELAXED:
234
            if (is_int && value >= 0) {
235
                strict = (value == 0);
236 237 238 239
                return 0;
            }
            break;

240 241 242 243
        default:
            break;
    }

244 245 246 247 248 249 250 251
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}

void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
    if (reply_pipe == pipe_)
        reply_pipe = NULL;
    dealer_t::xpipe_terminated (pipe_);
252 253
}

254 255 256 257
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
    while (true) {
        pipe_t *pipe = NULL;
258
        int rc = dealer_t::recvpipe (msg_, &pipe);
259 260
        if (rc != 0)
            return rc;
261
        if (!reply_pipe || pipe == reply_pipe)
262 263 264 265
            return 0;
    }
}

266 267
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
      socket_base_t *socket_, const options_t &options_,
268
      address_t *addr_) :
269
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
270
    state (bottom)
271 272 273 274 275 276
{
}

zmq::req_session_t::~req_session_t ()
{
}
277

278
int zmq::req_session_t::push_msg (msg_t *msg_)
279
{
280 281
    switch (state) {
    case bottom:
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
        if (msg_->flags () == msg_t::more) {
            //  In case option ZMQ_CORRELATE is on, allow request_id to be
            //  transfered as first frame (would be too cumbersome to check
            //  whether the option is actually on or not).
            if (msg_->size () == sizeof (uint32_t)) {
                state = request_id;
                return session_base_t::push_msg (msg_);
            }
            else if (msg_->size () == 0) {
                state = body;
                return session_base_t::push_msg (msg_);
            }
        }
        break;
    case request_id:
297
        if (msg_->flags () == msg_t::more && msg_->size () == 0) {
298
            state = body;
299
            return session_base_t::push_msg (msg_);
300
        }
301 302
        break;
    case body:
303
        if (msg_->flags () == msg_t::more)
304
            return session_base_t::push_msg (msg_);
305
        if (msg_->flags () == 0) {
306
            state = bottom;
307
            return session_base_t::push_msg (msg_);
308
        }
309
        break;
310 311 312 313 314
    }
    errno = EFAULT;
    return -1;
}

Martin Hurton's avatar
Martin Hurton committed
315 316 317
void zmq::req_session_t::reset ()
{
    session_base_t::reset ();
318
    state = bottom;
Martin Hurton's avatar
Martin Hurton committed
319
}