Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
407bd3b1
Commit
407bd3b1
authored
Aug 20, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: test_spec_pushpull not yet using unity
Solution: migrate to unity
parent
261dd1f9
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
116 additions
and
138 deletions
+116
-138
Makefile.am
Makefile.am
+2
-1
test_spec_pushpull.cpp
tests/test_spec_pushpull.cpp
+114
-137
No files found.
Makefile.am
View file @
407bd3b1
...
@@ -598,7 +598,8 @@ tests_test_spec_router_LDADD = src/libzmq.la ${UNITY_LIBS}
...
@@ -598,7 +598,8 @@ tests_test_spec_router_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_spec_router_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_spec_router_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_spec_pushpull_SOURCES
=
tests/test_spec_pushpull.cpp
tests_test_spec_pushpull_SOURCES
=
tests/test_spec_pushpull.cpp
tests_test_spec_pushpull_LDADD
=
src/libzmq.la
tests_test_spec_pushpull_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_spec_pushpull_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_req_correlate_SOURCES
=
tests/test_req_correlate.cpp
tests_test_req_correlate_SOURCES
=
tests/test_req_correlate.cpp
tests_test_req_correlate_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_req_correlate_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
...
...
tests/test_spec_pushpull.cpp
View file @
407bd3b1
...
@@ -28,33 +28,40 @@
...
@@ -28,33 +28,40 @@
*/
*/
#include "testutil.hpp"
#include "testutil.hpp"
#include "testutil_unity.hpp"
void
setUp
()
{
setup_test_context
();
}
void
tearDown
()
{
teardown_test_context
();
}
const
char
*
bind_address
=
0
;
char
connect_address
[
MAX_SOCKET_STRING
];
char
connect_address
[
MAX_SOCKET_STRING
];
void
test_push_round_robin_out
(
void
*
ctx_
)
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
void
test_push_round_robin_out
(
const
char
*
bind_address_
)
{
{
void
*
push
=
zmq_socket
(
ctx_
,
ZMQ_PUSH
);
void
*
push
=
test_context_socket
(
ZMQ_PUSH
);
assert
(
push
);
int
rc
=
zmq_bind
(
push
,
bind_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
push
,
bind_address_
));
assert
(
rc
==
0
);
size_t
len
=
MAX_SOCKET_STRING
;
size_t
len
=
MAX_SOCKET_STRING
;
rc
=
zmq_getsockopt
(
push
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
push
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
)
);
const
size_t
services
=
5
;
const
size_t
services
=
5
;
void
*
pulls
[
services
];
void
*
pulls
[
services
];
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
pulls
[
peer
]
=
zmq_socket
(
ctx_
,
ZMQ_PULL
);
pulls
[
peer
]
=
test_context_socket
(
ZMQ_PULL
);
assert
(
pulls
[
peer
]);
int
timeout
=
250
;
int
timeout
=
250
;
rc
=
zmq_setsockopt
(
pulls
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
pulls
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pulls
[
peer
],
connect_address
));
rc
=
zmq_connect
(
pulls
[
peer
],
connect_address
);
assert
(
rc
==
0
);
}
}
// Wait for connections.
// Wait for connections.
...
@@ -72,34 +79,29 @@ void test_push_round_robin_out (void *ctx_)
...
@@ -72,34 +79,29 @@ void test_push_round_robin_out (void *ctx_)
s_recv_seq
(
pulls
[
peer
],
"DEF"
,
SEQ_END
);
s_recv_seq
(
pulls
[
peer
],
"DEF"
,
SEQ_END
);
}
}
close_zero_linger
(
push
);
test_context_socket_
close_zero_linger
(
push
);
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
close_zero_linger
(
pulls
[
peer
]);
test_context_socket_close_zero_linger
(
pulls
[
peer
]);
// Wait for disconnects.
msleep
(
SETTLE_TIME
);
}
}
void
test_pull_fair_queue_in
(
void
*
ctx_
)
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
void
test_pull_fair_queue_in
(
const
char
*
bind_address_
)
{
{
void
*
pull
=
zmq_socket
(
ctx_
,
ZMQ_PULL
);
void
*
pull
=
test_context_socket
(
ZMQ_PULL
);
assert
(
pull
);
int
rc
=
zmq_bind
(
pull
,
bind_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pull
,
bind_address_
));
assert
(
rc
==
0
);
size_t
len
=
MAX_SOCKET_STRING
;
size_t
len
=
MAX_SOCKET_STRING
;
rc
=
zmq_getsockopt
(
pull
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
pull
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
)
);
const
unsigned
char
services
=
5
;
const
unsigned
char
services
=
5
;
void
*
pushs
[
services
];
void
*
pushs
[
services
];
for
(
unsigned
char
peer
=
0
;
peer
<
services
;
++
peer
)
{
for
(
unsigned
char
peer
=
0
;
peer
<
services
;
++
peer
)
{
pushs
[
peer
]
=
zmq_socket
(
ctx_
,
ZMQ_PUSH
);
pushs
[
peer
]
=
test_context_socket
(
ZMQ_PUSH
);
assert
(
pushs
[
peer
]);
rc
=
zmq_connect
(
pushs
[
peer
],
connect_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pushs
[
peer
],
connect_address
));
assert
(
rc
==
0
);
}
}
// Wait for connections.
// Wait for connections.
...
@@ -127,83 +129,72 @@ void test_pull_fair_queue_in (void *ctx_)
...
@@ -127,83 +129,72 @@ void test_pull_fair_queue_in (void *ctx_)
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
zmq_msg_t
msg
;
zmq_msg_t
msg
;
rc
=
zmq_msg_init
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
msg
));
assert
(
rc
==
0
);
// Expect to pull one from each first
// Expect to pull one from each first
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
2
);
2
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_recv
(
&
msg
,
pull
,
0
))
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
first_half
-=
str
[
0
];
first_half
-=
str
[
0
];
}
}
assert
(
first_half
==
0
);
TEST_ASSERT_EQUAL_INT
(
0
,
first_half
);
// And then get the second batch
// And then get the second batch
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
2
);
2
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_recv
(
&
msg
,
pull
,
0
))
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
second_half
-=
str
[
0
];
second_half
-=
str
[
0
];
}
}
assert
(
second_half
==
0
);
TEST_ASSERT_EQUAL_INT
(
0
,
second_half
);
rc
=
zmq_msg_close
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
msg
));
assert
(
rc
==
0
);
close_zero_linger
(
pull
);
test_context_socket_
close_zero_linger
(
pull
);
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
close_zero_linger
(
pushs
[
peer
]);
test_context_socket_close_zero_linger
(
pushs
[
peer
]);
// Wait for disconnects.
msleep
(
SETTLE_TIME
);
}
}
void
test_push_block_on_send_no_peers
(
void
*
ctx_
)
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
void
test_push_block_on_send_no_peers
(
const
char
*
bind_address_
)
{
{
void
*
sc
=
zmq_socket
(
ctx_
,
ZMQ_PUSH
);
void
*
sc
=
test_context_socket
(
ZMQ_PUSH
);
assert
(
sc
);
int
timeout
=
250
;
int
timeout
=
250
;
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
)));
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
rc
=
zmq_send
(
sc
,
0
,
0
,
0
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
sc
,
0
,
0
,
0
));
assert
(
errno
==
EAGAIN
);
rc
=
zmq_close
(
sc
);
test_context_socket_close
(
sc
);
assert
(
rc
==
0
);
}
}
void
test_destroy_queue_on_disconnect
(
void
*
ctx_
)
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
void
test_destroy_queue_on_disconnect
(
const
char
*
bind_address_
)
{
{
void
*
a
=
zmq_socket
(
ctx_
,
ZMQ_PUSH
);
void
*
a
=
test_context_socket
(
ZMQ_PUSH
);
assert
(
a
);
int
hwm
=
1
;
int
hwm
=
1
;
int
rc
=
zmq_setsockopt
(
a
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
a
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
))
);
rc
=
zmq_bind
(
a
,
bind_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
a
,
bind_address_
));
assert
(
rc
==
0
);
size_t
len
=
MAX_SOCKET_STRING
;
size_t
len
=
MAX_SOCKET_STRING
;
rc
=
zmq_getsockopt
(
a
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
a
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
len
)
);
void
*
b
=
zmq_socket
(
ctx_
,
ZMQ_PULL
);
void
*
b
=
test_context_socket
(
ZMQ_PULL
);
assert
(
b
);
rc
=
zmq_setsockopt
(
b
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
b
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
))
);
rc
=
zmq_connect
(
b
,
connect_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
b
,
connect_address
));
assert
(
rc
==
0
);
// Send two messages, one should be stuck in A's outgoing queue, the other
// Send two messages, one should be stuck in A's outgoing queue, the other
// arrives at B.
// arrives at B.
...
@@ -211,93 +202,79 @@ void test_destroy_queue_on_disconnect (void *ctx_)
...
@@ -211,93 +202,79 @@ void test_destroy_queue_on_disconnect (void *ctx_)
s_send_seq
(
a
,
"DEF"
,
SEQ_END
);
s_send_seq
(
a
,
"DEF"
,
SEQ_END
);
// Both queues should now be full, indicated by A blocking on send.
// Both queues should now be full, indicated by A blocking on send.
rc
=
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
rc
=
zmq_disconnect
(
b
,
connect_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_disconnect
(
b
,
connect_address
));
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{{
a
,
0
,
0
,
0
},
{
b
,
0
,
0
,
0
}};
zmq_pollitem_t
poller
[
2
]
=
{{
a
,
0
,
0
,
0
},
{
b
,
0
,
0
,
0
}};
rc
=
zmq_poll
(
poller
,
2
,
100
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
0
);
0
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_poll
(
poller
,
2
,
100
))
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
0
);
0
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_poll
(
poller
,
2
,
100
))
);
zmq_msg_t
msg
;
zmq_msg_t
msg
;
rc
=
zmq_msg_init
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
msg
));
assert
(
rc
==
0
);
// Can't receive old data on B.
// Can't receive old data on B.
rc
=
zmq_msg_recv
(
&
msg
,
b
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_msg_recv
(
&
msg
,
b
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
// Sending fails.
// Sending fails.
rc
=
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
// Reconnect B
// Reconnect B
rc
=
zmq_connect
(
b
,
connect_address
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
b
,
connect_address
));
assert
(
rc
==
0
);
// Still can't receive old data on B.
// Still can't receive old data on B.
rc
=
zmq_msg_recv
(
&
msg
,
b
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_msg_recv
(
&
msg
,
b
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
// two messages should be sendable before the queues are filled up.
// two messages should be sendable before the queues are filled up.
s_send_seq
(
a
,
"ABC"
,
SEQ_END
);
s_send_seq
(
a
,
"ABC"
,
SEQ_END
);
s_send_seq
(
a
,
"DEF"
,
SEQ_END
);
s_send_seq
(
a
,
"DEF"
,
SEQ_END
);
rc
=
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
a
,
0
,
0
,
ZMQ_DONTWAIT
));
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
rc
=
zmq_msg_close
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
msg
));
assert
(
rc
==
0
);
close_zero_linger
(
a
);
test_context_socket_close_zero_linger
(
a
);
close_zero_linger
(
b
);
test_context_socket_close_zero_linger
(
b
);
// Wait for disconnects.
msleep
(
SETTLE_TIME
);
}
}
int
main
(
void
)
#define def_test_spec_pushpull(name, bind_address_) \
{
void test_spec_pushpull_##name##_push_round_robin_out () \
setup_test_environment
();
{ \
void
*
ctx
=
zmq_ctx_new
();
test_push_round_robin_out (bind_address_); \
assert
(
ctx
);
} \
void test_spec_pushpull_##name##_pull_fair_queue_in () \
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://127.0.0.1:*"
};
{ \
test_pull_fair_queue_in (bind_address_); \
for
(
int
transport
=
0
;
transport
<
2
;
++
transport
)
{
} \
bind_address
=
binds
[
transport
];
void test_spec_pushpull_##name##_push_block_on_send_no_peers () \
{ \
// PUSH: SHALL route outgoing messages to connected peers using a
test_push_block_on_send_no_peers (bind_address_); \
// round-robin strategy.
} \
test_push_round_robin_out
(
ctx
);
void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
{ \
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
test_destroy_queue_on_disconnect (bind_address_); \
// strategy.
}
test_pull_fair_queue_in
(
ctx
);
// PUSH: SHALL block on sending, or return a suitable error, when it has no
def_test_spec_pushpull
(
inproc
,
"inproc://a"
)
// available peers.
test_push_block_on_send_no_peers
(
ctx
);
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
def_test_spec_pushpull
(
tcp
,
"tcp://127.0.0.1:*"
)
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
// *** Test disabled until libzmq does this properly ***
// test_destroy_queue_on_disconnect (ctx);
}
int
rc
=
zmq_ctx_term
(
ctx
);
int
main
()
assert
(
rc
==
0
);
{
setup_test_environment
();
return
0
;
UNITY_BEGIN
();
RUN_TEST
(
test_spec_pushpull_inproc_push_round_robin_out
);
RUN_TEST
(
test_spec_pushpull_tcp_push_round_robin_out
);
RUN_TEST
(
test_spec_pushpull_inproc_pull_fair_queue_in
);
RUN_TEST
(
test_spec_pushpull_tcp_pull_fair_queue_in
);
RUN_TEST
(
test_spec_pushpull_inproc_push_block_on_send_no_peers
);
RUN_TEST
(
test_spec_pushpull_tcp_push_block_on_send_no_peers
);
// TODO Tests disabled until libzmq does this properly
//RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
//RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
return
UNITY_END
();
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment