Commit 637f7941 authored by Ian Barber's avatar Ian Barber

Merge pull request #620 from ckamm/req-id

Add ZMQ_REQ_REQUEST_IDS option.
parents 6473dfd8 b9646f2a
...@@ -58,6 +58,7 @@ tests/test_spec_pushpull ...@@ -58,6 +58,7 @@ tests/test_spec_pushpull
tests/test_spec_rep tests/test_spec_rep
tests/test_spec_req tests/test_spec_req
tests/test_spec_router tests/test_spec_router
tests/test_req_request_ids
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat
......
...@@ -459,6 +459,23 @@ Default value:: 0 ...@@ -459,6 +459,23 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB Applicable socket types:: ZMQ_XPUB
ZMQ_REQ_REQUEST_IDS: enable extra request identity frames
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The default behavior of REQ sockets is to rely on the ordering of messages
to match requests and responses and that is usually sufficient. When this option
is set to 1, the REQ socket will prefix outgoing messages with an extra frame
containing a request id. That means the full message is (request id, 0,
user frames...). The REQ socket will discard all incoming messages that don't
begin with these two frames.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_REQ
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
...@@ -276,6 +276,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -276,6 +276,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SECRETKEY 49 #define ZMQ_CURVE_SECRETKEY 49
#define ZMQ_CURVE_SERVERKEY 50 #define ZMQ_CURVE_SERVERKEY 50
#define ZMQ_PROBE_ROUTER 51 #define ZMQ_PROBE_ROUTER 51
#define ZMQ_REQ_REQUEST_IDS 52
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -28,7 +28,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -28,7 +28,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
dealer_t (parent_, tid_, sid_), dealer_t (parent_, tid_, sid_),
receiving_reply (false), receiving_reply (false),
message_begins (true), message_begins (true),
reply_pipe (NULL) reply_pipe (NULL),
request_id_frames_enabled (false),
request_id (generate_random())
{ {
options.type = ZMQ_REQ; options.type = ZMQ_REQ;
} }
...@@ -48,16 +50,31 @@ int zmq::req_t::xsend (msg_t *msg_) ...@@ -48,16 +50,31 @@ int zmq::req_t::xsend (msg_t *msg_)
// First part of the request is the request identity. // First part of the request is the request identity.
if (message_begins) { if (message_begins) {
reply_pipe = NULL;
if (request_id_frames_enabled) {
request_id++;
msg_t id;
int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
errno_assert (rc == 0);
id.set_flags (msg_t::more);
rc = dealer_t::sendpipe (&id, &reply_pipe);
if (rc != 0)
return -1;
}
msg_t bottom; msg_t bottom;
int rc = bottom.init (); int rc = bottom.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
bottom.set_flags (msg_t::more); bottom.set_flags (msg_t::more);
reply_pipe = NULL;
rc = dealer_t::sendpipe (&bottom, &reply_pipe); rc = dealer_t::sendpipe (&bottom, &reply_pipe);
if (rc != 0) if (rc != 0)
return -1; return -1;
assert (reply_pipe); assert (reply_pipe);
message_begins = false; message_begins = false;
// Eat all currently avaliable messages before the request is fully // Eat all currently avaliable messages before the request is fully
...@@ -99,24 +116,39 @@ int zmq::req_t::xrecv (msg_t *msg_) ...@@ -99,24 +116,39 @@ int zmq::req_t::xrecv (msg_t *msg_)
return -1; return -1;
} }
// First part of the reply should be the original request ID. // Skip messages until one with the right first frames is found.
if (message_begins) { while (message_begins) {
// If enabled, the first frame must have the correct request_id.
if (request_id_frames_enabled) {
int rc = recv_reply_pipe (msg_);
if (rc != 0)
return rc;
if (unlikely (!(msg_->flags () & msg_t::more) ||
msg_->size () != sizeof (request_id) ||
*static_cast<uint32_t *> (msg_->data ()) != request_id)) {
// Skip the remaining frames and try the next message
while (msg_->flags () & msg_t::more) {
rc = recv_reply_pipe (msg_);
errno_assert (rc == 0);
}
continue;
}
}
// The next frame must be 0.
// TODO: Failing this check should also close the connection with the peer!
int rc = recv_reply_pipe (msg_); int rc = recv_reply_pipe (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
// TODO: This should also close the connection with the peer!
if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) { // Skip the remaining frames and try the next message
int rc = dealer_t::xrecv (msg_); while (msg_->flags () & msg_t::more) {
rc = recv_reply_pipe (msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
if (!(msg_->flags () & msg_t::more))
break;
} }
msg_->close (); continue;
msg_->init ();
errno = EAGAIN;
return -1;
} }
message_begins = false; message_begins = false;
...@@ -153,6 +185,25 @@ bool zmq::req_t::xhas_out () ...@@ -153,6 +185,25 @@ bool zmq::req_t::xhas_out ()
return dealer_t::xhas_out (); return dealer_t::xhas_out ();
} }
int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_REQ_REQUEST_IDS:
if (is_int && value >= 0) {
request_id_frames_enabled = value;
return 0;
}
break;
default:
break;
}
return dealer_t::xsetsockopt(option_, optval_, optvallen_);
}
int zmq::req_t::recv_reply_pipe (msg_t *msg_) int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{ {
while (true) { while (true) {
......
...@@ -43,6 +43,7 @@ namespace zmq ...@@ -43,6 +43,7 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
int xsetsockopt(int option_, const void *optval_, size_t optvallen_);
protected: protected:
...@@ -63,6 +64,17 @@ namespace zmq ...@@ -63,6 +64,17 @@ namespace zmq
// The pipe the request was sent to and where the reply is expected. // The pipe the request was sent to and where the reply is expected.
zmq::pipe_t *reply_pipe; zmq::pipe_t *reply_pipe;
// Whether request id frames shall be sent and expected.
bool request_id_frames_enabled;
// The current request id. It is incremented every time before a new
// request is sent.
uint32_t request_id;
// If true, send() will reset its internal state instead of failing if
// a previous request is still pending.
bool send_resets;
req_t (const req_t&); req_t (const req_t&);
const req_t &operator = (const req_t&); const req_t &operator = (const req_t&);
}; };
......
...@@ -29,7 +29,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -29,7 +29,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_spec_rep \ test_spec_rep \
test_spec_dealer \ test_spec_dealer \
test_spec_router \ test_spec_router \
test_spec_pushpull test_spec_pushpull \
test_req_request_ids
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -65,6 +66,7 @@ test_spec_rep_SOURCES = test_spec_rep.cpp ...@@ -65,6 +66,7 @@ test_spec_rep_SOURCES = test_spec_rep.cpp
test_spec_dealer_SOURCES = test_spec_dealer.cpp test_spec_dealer_SOURCES = test_spec_dealer.cpp
test_spec_router_SOURCES = test_spec_router.cpp test_spec_router_SOURCES = test_spec_router.cpp
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
test_req_request_ids_SOURCES = test_req_request_ids.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include "testutil.hpp"
int main (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int enabled = 1;
int rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int));
assert (rc == 0);
int rcvtimeo = 100;
rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
rc = zmq_connect (req, "tcp://localhost:5555");
assert (rc == 0);
rc = zmq_bind (router, "tcp://*:5555");
assert (rc == 0);
// Send a multi-part request.
s_send_seq (req, "ABC", "DEF", SEQ_END);
zmq_msg_t msg;
zmq_msg_init (&msg);
// Receive peer identity
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
assert (zmq_msg_size (&msg) > 0);
zmq_msg_t peer_id_msg;
zmq_msg_init (&peer_id_msg);
zmq_msg_copy (&peer_id_msg, &msg);
int more = 0;
size_t more_size = sizeof (more);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
assert (more);
// Receive request id 1
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
assert (zmq_msg_size (&msg) == sizeof(uint32_t));
uint32_t req_id = *static_cast<uint32_t *> (zmq_msg_data (&msg));
zmq_msg_t req_id_msg;
zmq_msg_init (&req_id_msg);
zmq_msg_copy (&req_id_msg, &msg);
more = 0;
more_size = sizeof (more);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
assert (more);
// Receive the rest.
s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
// Send back a bad reply: correct req id
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, 0);
assert (rc != -1);
// Send back a bad reply: wrong req id
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
uint32_t bad_req_id = req_id + 1;
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, 0);
assert (rc != -1);
// Send back a bad reply: correct req id, 0
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, SEQ_END);
// Send back a bad reply: correct req id, garbage
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, "FOO", SEQ_END);
// Send back a bad reply: wrong req id, 0
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, SEQ_END);
// Send back a bad reply: correct req id, garbage, data
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, "FOO", "DATA", SEQ_END);
// Send back a bad reply: wrong req id, 0, data
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, "DATA", SEQ_END);
// Send back a good reply.
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, "GHI", SEQ_END);
// Receive reply. If any of the other messages got through, we wouldn't see
// this particular data.
s_recv_seq (req, "GHI", SEQ_END);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_msg_close (&peer_id_msg);
assert (rc == 0);
rc = zmq_msg_close (&req_id_msg);
assert (rc == 0);
close_zero_linger (req);
close_zero_linger (router);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment