req.cpp 7.92 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29 30 31
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "req.hpp"
#include "err.hpp"
32
#include "msg.hpp"
33 34 35
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
36

37
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38
    dealer_t (parent_, tid_, sid_),
39
    receiving_reply (false),
40
    message_begins (true),
41 42
    reply_pipe (NULL),
    request_id_frames_enabled (false),
43
    request_id (generate_random()),
44
    strict (true)
45
{
46
    options.type = ZMQ_REQ;
47 48 49 50
}

zmq::req_t::~req_t ()
{
Martin Hurton's avatar
Martin Hurton committed
51 52
}

53
int zmq::req_t::xsend (msg_t *msg_)
54 55
{
    //  If we've sent a request and we still haven't got the reply,
56
    //  we can't send another request unless the strict option is disabled.
57
    if (receiving_reply) {
58
        if (strict) {
59 60 61 62 63 64 65 66
            errno = EFSM;
            return -1;
        }

        if (reply_pipe)
            reply_pipe->terminate (false);
        receiving_reply = false;
        message_begins = true;
67 68
    }

69
    //  First part of the request is the request identity.
70
    if (message_begins) {
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
        reply_pipe = NULL;

        if (request_id_frames_enabled) {
            request_id++;

            msg_t id;
            int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
            errno_assert (rc == 0);
            id.set_flags (msg_t::more);

            rc = dealer_t::sendpipe (&id, &reply_pipe);
            if (rc != 0)
                return -1;
        }

86 87
        msg_t bottom;
        int rc = bottom.init ();
88
        errno_assert (rc == 0);
89
        bottom.set_flags (msg_t::more);
90 91

        rc = dealer_t::sendpipe (&bottom, &reply_pipe);
92
        if (rc != 0)
93
            return -1;
94
        zmq_assert (reply_pipe);
95

96
        message_begins = false;
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

        // Eat all currently avaliable messages before the request is fully
        // sent. This is done to avoid:
        //   REQ sends request to A, A replies, B replies too.
        //   A's reply was first and matches, that is used.
        //   An hour later REQ sends a request to B. B's old reply is used.
        msg_t drop;
        while (true) {
            rc = drop.init ();
            errno_assert (rc == 0);
            rc = dealer_t::xrecv (&drop);
            if (rc != 0)
                break;
            drop.close ();
        }
112 113
    }

114
    bool more = msg_->flags () & msg_t::more ? true : false;
115

116
    int rc = dealer_t::xsend (msg_);
117 118
    if (rc != 0)
        return rc;
119

120 121 122 123
    //  If the request was fully sent, flip the FSM into reply-receiving state.
    if (!more) {
        receiving_reply = true;
        message_begins = true;
124
    }
125 126 127 128

    return 0;
}

129
int zmq::req_t::xrecv (msg_t *msg_)
130 131
{
    //  If request wasn't send, we can't wait for reply.
132
    if (!receiving_reply) {
133
        errno = EFSM;
134 135 136
        return -1;
    }

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    //  Skip messages until one with the right first frames is found.
    while (message_begins) {
        //  If enabled, the first frame must have the correct request_id.
        if (request_id_frames_enabled) {
            int rc = recv_reply_pipe (msg_);
            if (rc != 0)
                return rc;

            if (unlikely (!(msg_->flags () & msg_t::more) ||
                          msg_->size () != sizeof (request_id) ||
                          *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
                //  Skip the remaining frames and try the next message
                while (msg_->flags () & msg_t::more) {
                    rc = recv_reply_pipe (msg_);
                    errno_assert (rc == 0);
                }
                continue;
            }
        }

        //  The next frame must be 0.
        // TODO: Failing this check should also close the connection with the peer!
159
        int rc = recv_reply_pipe (msg_);
160 161
        if (rc != 0)
            return rc;
162

163
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
164 165 166
            //  Skip the remaining frames and try the next message
            while (msg_->flags () & msg_t::more) {
                rc = recv_reply_pipe (msg_);
167
                errno_assert (rc == 0);
168
            }
169
            continue;
170
        }
171

172
        message_begins = false;
173
    }
174

175
    int rc = recv_reply_pipe (msg_);
176 177 178 179
    if (rc != 0)
        return rc;

    //  If the reply is fully received, flip the FSM into request-sending state.
180
    if (!(msg_->flags () & msg_t::more)) {
181
        receiving_reply = false;
182
        message_begins = true;
183 184
    }

185 186 187
    return 0;
}

188 189
bool zmq::req_t::xhas_in ()
{
190 191
    //  TODO: Duplicates should be removed here.

192
    if (!receiving_reply)
193
        return false;
194

195
    return dealer_t::xhas_in ();
196 197 198 199
}

bool zmq::req_t::xhas_out ()
{
200
    if (receiving_reply)
201 202
        return false;

203
    return dealer_t::xhas_out ();
204 205
}

206
int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
207 208 209 210
{
    bool is_int = (optvallen_ == sizeof (int));
    int value = is_int? *((int *) optval_): 0;
    switch (option_) {
211
        case ZMQ_REQ_CORRELATE:
212
            if (is_int && value >= 0) {
213
                request_id_frames_enabled = (value != 0);
214 215 216 217
                return 0;
            }
            break;

218
        case ZMQ_REQ_RELAXED:
219
            if (is_int && value >= 0) {
220
                strict = (value == 0);
221 222 223 224
                return 0;
            }
            break;

225 226 227 228
        default:
            break;
    }

229 230 231 232 233 234 235 236
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}

void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
    if (reply_pipe == pipe_)
        reply_pipe = NULL;
    dealer_t::xpipe_terminated (pipe_);
237 238
}

239 240 241 242
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
    while (true) {
        pipe_t *pipe = NULL;
243
        int rc = dealer_t::recvpipe (msg_, &pipe);
244 245
        if (rc != 0)
            return rc;
246
        if (!reply_pipe || pipe == reply_pipe)
247 248 249 250
            return 0;
    }
}

251 252
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
      socket_base_t *socket_, const options_t &options_,
253
      address_t *addr_) :
254
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
255
    state (bottom)
256 257 258 259 260 261
{
}

zmq::req_session_t::~req_session_t ()
{
}
262

263
int zmq::req_session_t::push_msg (msg_t *msg_)
264
{
265 266
    switch (state) {
    case bottom:
267
        if (msg_->flags () == msg_t::more && msg_->size () == 0) {
268
            state = body;
269
            return session_base_t::push_msg (msg_);
270
        }
271 272
        break;
    case body:
273
        if (msg_->flags () == msg_t::more)
274
            return session_base_t::push_msg (msg_);
275
        if (msg_->flags () == 0) {
276
            state = bottom;
277
            return session_base_t::push_msg (msg_);
278
        }
279
        break;
280 281 282 283 284
    }
    errno = EFAULT;
    return -1;
}

Martin Hurton's avatar
Martin Hurton committed
285 286 287
void zmq::req_session_t::reset ()
{
    session_base_t::reset ();
288
    state = bottom;
Martin Hurton's avatar
Martin Hurton committed
289
}