Commit b8ae850d authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1862 from FredTreg/master

Fixed issue #1695 (ZMQ_REQ_CORRELATE)
parents 98ab7f41 e45dfe3b
......@@ -667,11 +667,9 @@ ZMQ_REQ_RELAXED: relax strict alternation between request and reply
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
By default, a REQ socket does not allow initiating a new request with
_zmq_send(3)_ until the reply to the previous one has been received.
When set to 1, sending another message is allowed and has the effect of
disconnecting the underlying connection to the peer from which the reply was
expected, triggering a reconnection attempt on transports that support it.
The request-reply state machine is reset and a new request is sent to the
next available peer.
When set to 1, sending another message is allowed and previous replies will
be discarded if any. The request-reply state machine is reset and a new
request is sent to the next available peer.
If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of
requests and replies. Otherwise a late reply to an aborted request can be
......
......@@ -35,13 +35,21 @@
#include "random.hpp"
#include "likely.hpp"
extern "C"
{
static void free_id (void *data, void *hint)
{
free (data);
}
}
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
dealer_t (parent_, tid_, sid_),
receiving_reply (false),
message_begins (true),
reply_pipe (NULL),
request_id_frames_enabled (false),
request_id (generate_random()),
request_id (generate_random ()),
strict (true)
{
options.type = ZMQ_REQ;
......@@ -72,8 +80,13 @@ int zmq::req_t::xsend (msg_t *msg_)
if (request_id_frames_enabled) {
request_id++;
// Copy request id before sending (see issue #1695 for details).
uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
*request_id_copy = request_id;
msg_t id;
int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
int rc = id.init_data (request_id_copy, sizeof (uint32_t),
free_id, NULL);
errno_assert (rc == 0);
id.set_flags (msg_t::more);
......@@ -206,7 +219,8 @@ int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_
{
bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int) memcpy(&value, optval_, sizeof (int));
if (is_int)
memcpy (&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_REQ_CORRELATE:
......
......@@ -29,6 +29,31 @@
#include "testutil.hpp"
static void bounce (void *socket)
{
int more;
size_t more_size = sizeof(more);
do {
zmq_msg_t recv_part, sent_part;
int rc = zmq_msg_init (&recv_part);
assert (rc == 0);
rc = zmq_msg_recv (&recv_part, socket, 0);
assert (rc != -1);
rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
zmq_msg_init (&sent_part);
zmq_msg_copy (&sent_part, &recv_part);
rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0);
assert (rc != -1);
zmq_msg_close(&recv_part);
} while (more);
}
int main (void)
{
setup_test_environment();
......@@ -58,7 +83,7 @@ int main (void)
rep [peer] = zmq_socket (ctx, ZMQ_REP);
assert (rep [peer]);
int timeout = 250;
int timeout = 500;
rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
......@@ -120,7 +145,6 @@ int main (void)
s_send_seq(req, "J", SEQ_END);
s_recv_seq(rep [0], "J", SEQ_END);
close_zero_linger (req);
for (size_t peer = 0; peer < services; peer++)
close_zero_linger (rep [peer]);
......@@ -128,6 +152,53 @@ int main (void)
// Wait for disconnects.
msleep (SETTLE_TIME);
// Case 4: Check issue #1695. As messages may pile up before a responder
// is available, we check that responses to messages other than the last
// sent one are correctly discarded by the REQ pipe
// Setup REQ socket as client
req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_connect (req, "tcp://localhost:5555");
assert (rc == 0);
// Setup ROUTER socket as server but do not bind it just yet
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert(router);
int timeout = 1000;
rc = zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
// Send two requests
s_send_seq (req, "TO_BE_DISCARDED", SEQ_END);
s_send_seq (req, "TO_BE_ANSWERED", SEQ_END);
// Bind server allowing it to receive messages
rc = zmq_bind(router, "tcp://127.0.0.1:5555");
assert (rc == 0);
// Read the two messages and send them back as is
bounce (router);
bounce (router);
// Read the expected correlated reply. As the ZMQ_REQ_CORRELATE is active,
// the expected answer is "TO_BE_ANSWERED", not "TO_BE_DISCARDED".
s_recv_seq (req, "TO_BE_ANSWERED", SEQ_END);
close_zero_linger (req);
close_zero_linger (router);
// Wait for disconnects.
msleep (SETTLE_TIME);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment