Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
7a563ebb
Commit
7a563ebb
authored
Apr 02, 2016
by
Constantin Rack
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1882 from FredTreg/master
Fixed ZMQ_REQ_CORRELATE (see pull request #1730)
parents
0feec7a7
625b6187
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
27 additions
and
71 deletions
+27
-71
Makefile.am
Makefile.am
+1
-2
req.cpp
src/req.cpp
+15
-0
req.hpp
src/req.hpp
+1
-0
test_req_correlate.cpp
tests/test_req_correlate.cpp
+2
-53
test_req_relaxed.cpp
tests/test_req_relaxed.cpp
+8
-16
No files found.
Makefile.am
View file @
7a563ebb
...
@@ -732,8 +732,7 @@ check_PROGRAMS = ${test_apps}
...
@@ -732,8 +732,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases
# Run the test cases
TESTS
=
$(test_apps)
TESTS
=
$(test_apps)
XFAIL_TESTS
=
tests/test_req_correlate
\
XFAIL_TESTS
=
tests/test_req_relaxed
if
!ON_LINUX
if
!ON_LINUX
XFAIL_TESTS
+=
tests/test_abstract_ipc
XFAIL_TESTS
+=
tests/test_abstract_ipc
...
...
src/req.cpp
View file @
7a563ebb
...
@@ -279,6 +279,21 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
...
@@ -279,6 +279,21 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
{
{
switch
(
state
)
{
switch
(
state
)
{
case
bottom
:
case
bottom
:
if
(
msg_
->
flags
()
==
msg_t
::
more
)
{
// In case option ZMQ_CORRELATE is on, allow request_id to be
// transfered as first frame (would be too cumbersome to check
// whether the option is actually on or not).
if
(
msg_
->
size
()
==
sizeof
(
uint32_t
))
{
state
=
request_id
;
return
session_base_t
::
push_msg
(
msg_
);
}
else
if
(
msg_
->
size
()
==
0
)
{
state
=
body
;
return
session_base_t
::
push_msg
(
msg_
);
}
}
break
;
case
request_id
:
if
(
msg_
->
flags
()
==
msg_t
::
more
&&
msg_
->
size
()
==
0
)
{
if
(
msg_
->
flags
()
==
msg_t
::
more
&&
msg_
->
size
()
==
0
)
{
state
=
body
;
state
=
body
;
return
session_base_t
::
push_msg
(
msg_
);
return
session_base_t
::
push_msg
(
msg_
);
...
...
src/req.hpp
View file @
7a563ebb
...
@@ -108,6 +108,7 @@ namespace zmq
...
@@ -108,6 +108,7 @@ namespace zmq
enum
{
enum
{
bottom
,
bottom
,
request_id
,
body
body
}
state
;
}
state
;
...
...
tests/test_req_correlate.cpp
View file @
7a563ebb
...
@@ -93,58 +93,7 @@ int main (void)
...
@@ -93,58 +93,7 @@ int main (void)
// Receive the rest.
// Receive the rest.
s_recv_seq
(
router
,
0
,
"ABC"
,
"DEF"
,
SEQ_END
);
s_recv_seq
(
router
,
0
,
"ABC"
,
"DEF"
,
SEQ_END
);
// Send back a bad reply: correct req id
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
zmq_msg_copy
(
&
msg
,
&
req_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
0
);
assert
(
rc
!=
-
1
);
// Send back a bad reply: wrong req id
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
uint32_t
bad_req_id
=
req_id
+
1
;
uint32_t
bad_req_id
=
req_id
+
1
;
zmq_msg_init_data
(
&
msg
,
&
bad_req_id
,
sizeof
(
uint32_t
),
NULL
,
NULL
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
0
);
assert
(
rc
!=
-
1
);
// Send back a bad reply: correct req id, 0
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
zmq_msg_copy
(
&
msg
,
&
req_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
0
,
SEQ_END
);
// Send back a bad reply: correct req id, garbage
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
zmq_msg_copy
(
&
msg
,
&
req_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
"FOO"
,
SEQ_END
);
// Send back a bad reply: wrong req id, 0
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
zmq_msg_init_data
(
&
msg
,
&
bad_req_id
,
sizeof
(
uint32_t
),
NULL
,
NULL
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
0
,
SEQ_END
);
// Send back a bad reply: correct req id, garbage, data
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
zmq_msg_copy
(
&
msg
,
&
req_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
"FOO"
,
"DATA"
,
SEQ_END
);
// Send back a bad reply: wrong req id, 0, data
// Send back a bad reply: wrong req id, 0, data
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
...
@@ -155,7 +104,7 @@ int main (void)
...
@@ -155,7 +104,7 @@ int main (void)
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
0
,
"DATA"
,
SEQ_END
);
s_send_seq
(
router
,
0
,
"DATA"
,
SEQ_END
);
// Send back a good reply
.
// Send back a good reply
: good req id, 0, data
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
zmq_msg_copy
(
&
msg
,
&
peer_id_msg
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
msg
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
...
@@ -164,7 +113,7 @@ int main (void)
...
@@ -164,7 +113,7 @@ int main (void)
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
s_send_seq
(
router
,
0
,
"GHI"
,
SEQ_END
);
s_send_seq
(
router
,
0
,
"GHI"
,
SEQ_END
);
// Receive reply. If
any of the other messages
got through, we wouldn't see
// Receive reply. If
bad reply
got through, we wouldn't see
// this particular data.
// this particular data.
s_recv_seq
(
req
,
"GHI"
,
SEQ_END
);
s_recv_seq
(
req
,
"GHI"
,
SEQ_END
);
...
...
tests/test_req_relaxed.cpp
View file @
7a563ebb
...
@@ -32,7 +32,7 @@
...
@@ -32,7 +32,7 @@
static
void
bounce
(
void
*
socket
)
static
void
bounce
(
void
*
socket
)
{
{
int
more
;
int
more
;
size_t
more_size
=
sizeof
(
more
);
size_t
more_size
=
sizeof
(
more
);
do
{
do
{
zmq_msg_t
recv_part
,
sent_part
;
zmq_msg_t
recv_part
,
sent_part
;
int
rc
=
zmq_msg_init
(
&
recv_part
);
int
rc
=
zmq_msg_init
(
&
recv_part
);
...
@@ -50,13 +50,13 @@ static void bounce (void *socket)
...
@@ -50,13 +50,13 @@ static void bounce (void *socket)
rc
=
zmq_msg_send
(
&
sent_part
,
socket
,
more
?
ZMQ_SNDMORE
:
0
);
rc
=
zmq_msg_send
(
&
sent_part
,
socket
,
more
?
ZMQ_SNDMORE
:
0
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
zmq_msg_close
(
&
recv_part
);
zmq_msg_close
(
&
recv_part
);
}
while
(
more
);
}
while
(
more
);
}
}
int
main
(
void
)
int
main
(
void
)
{
{
setup_test_environment
();
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
...
@@ -70,10 +70,6 @@ int main (void)
...
@@ -70,10 +70,6 @@ int main (void)
rc
=
zmq_setsockopt
(
req
,
ZMQ_REQ_CORRELATE
,
&
enabled
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
req
,
ZMQ_REQ_CORRELATE
,
&
enabled
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
rcvtimeo
=
100
;
rc
=
zmq_setsockopt
(
req
,
ZMQ_RCVTIMEO
,
&
rcvtimeo
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
req
,
"tcp://127.0.0.1:5555"
);
rc
=
zmq_bind
(
req
,
"tcp://127.0.0.1:5555"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -127,7 +123,7 @@ int main (void)
...
@@ -127,7 +123,7 @@ int main (void)
s_recv_seq
(
rep
[
3
],
"H"
,
SEQ_END
);
s_recv_seq
(
rep
[
3
],
"H"
,
SEQ_END
);
s_send_seq
(
rep
[
3
],
"BAD"
,
SEQ_END
);
s_send_seq
(
rep
[
3
],
"BAD"
,
SEQ_END
);
// Wait for message to be there.
//
Wait for message to be there.
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
// Without receiving that reply, send another request on the REQ socket
// Without receiving that reply, send another request on the REQ socket
...
@@ -142,8 +138,8 @@ int main (void)
...
@@ -142,8 +138,8 @@ int main (void)
// communication pipes. For example pipe from req to rep[0] should not be
// communication pipes. For example pipe from req to rep[0] should not be
// closed after executing Case 1. So rep[0] should be the next to receive,
// closed after executing Case 1. So rep[0] should be the next to receive,
// not rep[1].
// not rep[1].
s_send_seq
(
req
,
"J"
,
SEQ_END
);
s_send_seq
(
req
,
"J"
,
SEQ_END
);
s_recv_seq
(
rep
[
0
],
"J"
,
SEQ_END
);
s_recv_seq
(
rep
[
0
],
"J"
,
SEQ_END
);
close_zero_linger
(
req
);
close_zero_linger
(
req
);
for
(
size_t
peer
=
0
;
peer
<
services
;
peer
++
)
for
(
size_t
peer
=
0
;
peer
<
services
;
peer
++
)
...
@@ -171,18 +167,14 @@ int main (void)
...
@@ -171,18 +167,14 @@ int main (void)
// Setup ROUTER socket as server but do not bind it just yet
// Setup ROUTER socket as server but do not bind it just yet
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
router
);
assert
(
router
);
int
timeout
=
1000
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
// Send two requests
// Send two requests
s_send_seq
(
req
,
"TO_BE_DISCARDED"
,
SEQ_END
);
s_send_seq
(
req
,
"TO_BE_DISCARDED"
,
SEQ_END
);
s_send_seq
(
req
,
"TO_BE_ANSWERED"
,
SEQ_END
);
s_send_seq
(
req
,
"TO_BE_ANSWERED"
,
SEQ_END
);
// Bind server allowing it to receive messages
// Bind server allowing it to receive messages
rc
=
zmq_bind
(
router
,
"tcp://127.0.0.1:5555"
);
rc
=
zmq_bind
(
router
,
"tcp://127.0.0.1:5555"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Read the two messages and send them back as is
// Read the two messages and send them back as is
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment