Commit 625b6187 authored by Frederic Tregon's avatar Frederic Tregon

Fixed ZMQ_REQ_CORRELATE (see pull request #1730)

Problem: Since pull request #1730 was merged, protocol for REQ socket is
checked at the session level and this check does not take into account
the possibility of a request_id being part of the message. Thus the option
ZMQ_REQ_CORRELATE would no longer work.
This is now fixed: the possiblity of a 4 bytes integer being present
before the delimiter frame is taken into account (whether or not this
breaks the REQ/REP RFC is another issue).
parent 0feec7a7
...@@ -732,8 +732,7 @@ check_PROGRAMS = ${test_apps} ...@@ -732,8 +732,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases # Run the test cases
TESTS = $(test_apps) TESTS = $(test_apps)
XFAIL_TESTS = tests/test_req_correlate \ XFAIL_TESTS =
tests/test_req_relaxed
if !ON_LINUX if !ON_LINUX
XFAIL_TESTS += tests/test_abstract_ipc XFAIL_TESTS += tests/test_abstract_ipc
......
...@@ -279,6 +279,21 @@ int zmq::req_session_t::push_msg (msg_t *msg_) ...@@ -279,6 +279,21 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
{ {
switch (state) { switch (state) {
case bottom: case bottom:
if (msg_->flags () == msg_t::more) {
// In case option ZMQ_CORRELATE is on, allow request_id to be
// transfered as first frame (would be too cumbersome to check
// whether the option is actually on or not).
if (msg_->size () == sizeof (uint32_t)) {
state = request_id;
return session_base_t::push_msg (msg_);
}
else if (msg_->size () == 0) {
state = body;
return session_base_t::push_msg (msg_);
}
}
break;
case request_id:
if (msg_->flags () == msg_t::more && msg_->size () == 0) { if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body; state = body;
return session_base_t::push_msg (msg_); return session_base_t::push_msg (msg_);
......
...@@ -108,6 +108,7 @@ namespace zmq ...@@ -108,6 +108,7 @@ namespace zmq
enum { enum {
bottom, bottom,
request_id,
body body
} state; } state;
......
...@@ -93,58 +93,7 @@ int main (void) ...@@ -93,58 +93,7 @@ int main (void)
// Receive the rest. // Receive the rest.
s_recv_seq (router, 0, "ABC", "DEF", SEQ_END); s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
// Send back a bad reply: correct req id
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, 0);
assert (rc != -1);
// Send back a bad reply: wrong req id
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
uint32_t bad_req_id = req_id + 1; uint32_t bad_req_id = req_id + 1;
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, 0);
assert (rc != -1);
// Send back a bad reply: correct req id, 0
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, SEQ_END);
// Send back a bad reply: correct req id, garbage
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, "FOO", SEQ_END);
// Send back a bad reply: wrong req id, 0
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, 0, SEQ_END);
// Send back a bad reply: correct req id, garbage, data
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
s_send_seq (router, "FOO", "DATA", SEQ_END);
// Send back a bad reply: wrong req id, 0, data // Send back a bad reply: wrong req id, 0, data
zmq_msg_copy (&msg, &peer_id_msg); zmq_msg_copy (&msg, &peer_id_msg);
...@@ -155,7 +104,7 @@ int main (void) ...@@ -155,7 +104,7 @@ int main (void)
assert (rc != -1); assert (rc != -1);
s_send_seq (router, 0, "DATA", SEQ_END); s_send_seq (router, 0, "DATA", SEQ_END);
// Send back a good reply. // Send back a good reply: good req id, 0, data
zmq_msg_copy (&msg, &peer_id_msg); zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE); rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
...@@ -164,7 +113,7 @@ int main (void) ...@@ -164,7 +113,7 @@ int main (void)
assert (rc != -1); assert (rc != -1);
s_send_seq (router, 0, "GHI", SEQ_END); s_send_seq (router, 0, "GHI", SEQ_END);
// Receive reply. If any of the other messages got through, we wouldn't see // Receive reply. If bad reply got through, we wouldn't see
// this particular data. // this particular data.
s_recv_seq (req, "GHI", SEQ_END); s_recv_seq (req, "GHI", SEQ_END);
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
static void bounce (void *socket) static void bounce (void *socket)
{ {
int more; int more;
size_t more_size = sizeof(more); size_t more_size = sizeof (more);
do { do {
zmq_msg_t recv_part, sent_part; zmq_msg_t recv_part, sent_part;
int rc = zmq_msg_init (&recv_part); int rc = zmq_msg_init (&recv_part);
...@@ -50,13 +50,13 @@ static void bounce (void *socket) ...@@ -50,13 +50,13 @@ static void bounce (void *socket)
rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0); rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0);
assert (rc != -1); assert (rc != -1);
zmq_msg_close(&recv_part); zmq_msg_close (&recv_part);
} while (more); } while (more);
} }
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment ();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -70,10 +70,6 @@ int main (void) ...@@ -70,10 +70,6 @@ int main (void)
rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)); rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0); assert (rc == 0);
int rcvtimeo = 100;
rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
rc = zmq_bind (req, "tcp://127.0.0.1:5555"); rc = zmq_bind (req, "tcp://127.0.0.1:5555");
assert (rc == 0); assert (rc == 0);
...@@ -142,8 +138,8 @@ int main (void) ...@@ -142,8 +138,8 @@ int main (void)
// communication pipes. For example pipe from req to rep[0] should not be // communication pipes. For example pipe from req to rep[0] should not be
// closed after executing Case 1. So rep[0] should be the next to receive, // closed after executing Case 1. So rep[0] should be the next to receive,
// not rep[1]. // not rep[1].
s_send_seq(req, "J", SEQ_END); s_send_seq (req, "J", SEQ_END);
s_recv_seq(rep [0], "J", SEQ_END); s_recv_seq (rep [0], "J", SEQ_END);
close_zero_linger (req); close_zero_linger (req);
for (size_t peer = 0; peer < services; peer++) for (size_t peer = 0; peer < services; peer++)
...@@ -171,18 +167,14 @@ int main (void) ...@@ -171,18 +167,14 @@ int main (void)
// Setup ROUTER socket as server but do not bind it just yet // Setup ROUTER socket as server but do not bind it just yet
void *router = zmq_socket (ctx, ZMQ_ROUTER); void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert(router); assert (router);
int timeout = 1000;
rc = zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
// Send two requests // Send two requests
s_send_seq (req, "TO_BE_DISCARDED", SEQ_END); s_send_seq (req, "TO_BE_DISCARDED", SEQ_END);
s_send_seq (req, "TO_BE_ANSWERED", SEQ_END); s_send_seq (req, "TO_BE_ANSWERED", SEQ_END);
// Bind server allowing it to receive messages // Bind server allowing it to receive messages
rc = zmq_bind(router, "tcp://127.0.0.1:5555"); rc = zmq_bind (router, "tcp://127.0.0.1:5555");
assert (rc == 0); assert (rc == 0);
// Read the two messages and send them back as is // Read the two messages and send them back as is
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment