Commit b30cbfc1 authored by Simon Giesecke's avatar Simon Giesecke

Problem: test_spec_dealer not using test framework

Solution: migrate to Unity
parent d81a041f
...@@ -572,7 +572,8 @@ tests_test_spec_rep_SOURCES = tests/test_spec_rep.cpp ...@@ -572,7 +572,8 @@ tests_test_spec_rep_SOURCES = tests/test_spec_rep.cpp
tests_test_spec_rep_LDADD = src/libzmq.la tests_test_spec_rep_LDADD = src/libzmq.la
tests_test_spec_dealer_SOURCES = tests/test_spec_dealer.cpp tests_test_spec_dealer_SOURCES = tests/test_spec_dealer.cpp
tests_test_spec_dealer_LDADD = src/libzmq.la tests_test_spec_dealer_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_spec_dealer_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_spec_router_SOURCES = tests/test_spec_router.cpp tests_test_spec_router_SOURCES = tests/test_spec_router.cpp
tests_test_spec_router_LDADD = src/libzmq.la tests_test_spec_router_LDADD = src/libzmq.la
......
...@@ -28,33 +28,38 @@ ...@@ -28,33 +28,38 @@
*/ */
#include "testutil.hpp" #include "testutil.hpp"
#include "testutil_unity.hpp"
const char *bind_address = 0; void setUp ()
char connect_address[MAX_SOCKET_STRING]; {
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_round_robin_out (void *ctx)
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
void test_round_robin_out (const char *bind_address)
{ {
void *dealer = zmq_socket (ctx, ZMQ_DEALER); void *dealer = test_context_socket (ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (dealer, bind_address); char connect_address[MAX_SOCKET_STRING];
assert (rc == 0); test_bind (dealer, bind_address, connect_address, sizeof (connect_address));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5; const size_t services = 5;
void *rep[services]; void *rep[services];
for (size_t peer = 0; peer < services; ++peer) { for (size_t peer = 0; peer < services; ++peer) {
rep[peer] = zmq_socket (ctx, ZMQ_REP); rep[peer] = test_context_socket (ZMQ_REP);
assert (rep[peer]);
int timeout = 250; int timeout = 250;
rc = zmq_setsockopt (rep[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)); TEST_ASSERT_SUCCESS_ERRNO (
assert (rc == 0); zmq_setsockopt (rep[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
rc = zmq_connect (rep[peer], connect_address); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep[peer], connect_address));
assert (rc == 0);
} }
// Wait for connections. // Wait for connections.
...@@ -71,50 +76,42 @@ void test_round_robin_out (void *ctx) ...@@ -71,50 +76,42 @@ void test_round_robin_out (void *ctx)
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
s_recv_seq (rep[peer], "ABC", SEQ_END); s_recv_seq (rep[peer], "ABC", SEQ_END);
rc = zmq_msg_close (&msg); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
assert (rc == 0);
close_zero_linger (dealer); test_context_socket_close_zero_linger (dealer);
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
close_zero_linger (rep[peer]); test_context_socket_close_zero_linger (rep[peer]);
// Wait for disconnects.
msleep (SETTLE_TIME);
} }
void test_fair_queue_in (void *ctx) // SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
void test_fair_queue_in (const char *bind_address)
{ {
void *receiver = zmq_socket (ctx, ZMQ_DEALER); void *receiver = test_context_socket (ZMQ_DEALER);
assert (receiver);
int timeout = 250; int timeout = 250;
int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof (int)); TEST_ASSERT_SUCCESS_ERRNO (
assert (rc == 0); zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
rc = zmq_bind (receiver, bind_address); char connect_address[MAX_SOCKET_STRING];
assert (rc == 0); test_bind (receiver, bind_address, connect_address,
size_t len = MAX_SOCKET_STRING; sizeof (connect_address));
rc = zmq_getsockopt (receiver, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5; const size_t services = 5;
void *senders[services]; void *senders[services];
for (size_t peer = 0; peer < services; ++peer) { for (size_t peer = 0; peer < services; ++peer) {
senders[peer] = zmq_socket (ctx, ZMQ_DEALER); senders[peer] = test_context_socket (ZMQ_DEALER);
assert (senders[peer]);
rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (senders[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)); zmq_setsockopt (senders[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
assert (rc == 0);
rc = zmq_connect (senders[peer], connect_address); TEST_ASSERT_SUCCESS_ERRNO (
assert (rc == 0); zmq_connect (senders[peer], connect_address));
} }
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
assert (rc == 0);
s_send_seq (senders[0], "A", SEQ_END); s_send_seq (senders[0], "A", SEQ_END);
s_recv_seq (receiver, "A", SEQ_END); s_recv_seq (receiver, "A", SEQ_END);
...@@ -133,135 +130,105 @@ void test_fair_queue_in (void *ctx) ...@@ -133,135 +130,105 @@ void test_fair_queue_in (void *ctx)
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
s_recv_seq (receiver, "B", SEQ_END); s_recv_seq (receiver, "B", SEQ_END);
rc = zmq_msg_close (&msg); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
assert (rc == 0);
close_zero_linger (receiver); test_context_socket_close_zero_linger (receiver);
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
close_zero_linger (senders[peer]); test_context_socket_close_zero_linger (senders[peer]);
// Wait for disconnects.
msleep (SETTLE_TIME);
} }
void test_destroy_queue_on_disconnect (void *ctx) // SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
void test_destroy_queue_on_disconnect (const char *bind_address)
{ {
void *A = zmq_socket (ctx, ZMQ_DEALER); void *A = test_context_socket (ZMQ_DEALER);
assert (A);
int rc = zmq_bind (A, bind_address); char connect_address[MAX_SOCKET_STRING];
assert (rc == 0); test_bind (A, bind_address, connect_address, sizeof (connect_address));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (A, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_DEALER); void *B = test_context_socket (ZMQ_DEALER);
assert (B);
rc = zmq_connect (B, connect_address); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (B, connect_address));
assert (rc == 0);
// Send a message in both directions // Send a message in both directions
s_send_seq (A, "ABC", SEQ_END); s_send_seq (A, "ABC", SEQ_END);
s_send_seq (B, "DEF", SEQ_END); s_send_seq (B, "DEF", SEQ_END);
rc = zmq_disconnect (B, connect_address); TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (B, connect_address));
assert (rc == 0);
// Disconnect may take time and need command processing. // Disconnect may take time and need command processing.
zmq_pollitem_t poller[2] = {{A, 0, 0, 0}, {B, 0, 0, 0}}; zmq_pollitem_t poller[2] = {{A, 0, 0, 0}, {B, 0, 0, 0}};
rc = zmq_poll (poller, 2, 100); TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
assert (rc == 0); TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
// No messages should be available, sending should fail. // No messages should be available, sending should fail.
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (A, 0, 0, ZMQ_DONTWAIT));
assert (rc == -1);
assert (errno == EAGAIN);
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, A, ZMQ_DONTWAIT));
assert (rc == -1);
assert (errno == EAGAIN);
// After a reconnect of B, the messages should still be gone // After a reconnect of B, the messages should still be gone
rc = zmq_connect (B, connect_address); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (B, connect_address));
assert (rc == 0);
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, A, ZMQ_DONTWAIT));
assert (rc == -1);
assert (errno == EAGAIN);
rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, B, ZMQ_DONTWAIT));
assert (rc == -1);
assert (errno == EAGAIN);
rc = zmq_msg_close (&msg); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
assert (rc == 0);
close_zero_linger (A); test_context_socket_close_zero_linger (A);
close_zero_linger (B); test_context_socket_close_zero_linger (B);
// Wait for disconnects.
msleep (SETTLE_TIME);
} }
void test_block_on_send_no_peers (void *ctx) // SHALL block on sending, or return a suitable error, when it has no connected peers.
void test_block_on_send_no_peers (const char *bind_address)
{ {
void *sc = zmq_socket (ctx, ZMQ_DEALER); void *sc = test_context_socket (ZMQ_DEALER);
assert (sc);
int timeout = 250; int timeout = 250;
int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)); TEST_ASSERT_SUCCESS_ERRNO (
assert (rc == 0); zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
rc = zmq_send (sc, 0, 0, 0); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
assert (rc == -1); TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
assert (errno == EAGAIN);
rc = zmq_close (sc); test_context_socket_close (sc);
assert (rc == 0);
} }
#define TEST_CASES(name, bind_address) \
void test_round_robin_out_##name () \
{ \
test_round_robin_out (bind_address); \
} \
void test_fair_queue_in_##name () { test_fair_queue_in (bind_address); } \
void test_block_on_send_no_peers_##name () \
{ \
test_block_on_send_no_peers (bind_address); \
}
TEST_CASES (inproc, "inproc://a")
TEST_CASES (tcp, "tcp://127.0.0.1:*")
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds[] = {"inproc://a", "tcp://127.0.0.1:*"};
for (int transports = 0; transports < 2; ++transports) {
bind_address = binds[transports];
// SHALL route outgoing messages to available peers using a round-robin UNITY_BEGIN ();
// strategy.
test_round_robin_out (ctx);
// SHALL receive incoming messages from its peers using a fair-queuing RUN_TEST (test_round_robin_out_inproc);
// strategy. RUN_TEST (test_fair_queue_in_inproc);
test_fair_queue_in (ctx); RUN_TEST (test_block_on_send_no_peers_inproc);
// SHALL block on sending, or return a suitable error, when it has no connected peers. RUN_TEST (test_round_robin_out_tcp);
test_block_on_send_no_peers (ctx); RUN_TEST (test_fair_queue_in_tcp);
RUN_TEST (test_block_on_send_no_peers_tcp);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
// *** Test disabled until libzmq does this properly ***
// test_destroy_queue_on_disconnect (ctx);
}
int rc = zmq_ctx_term (ctx); // TODO *** Test disabled until libzmq does this properly ***
assert (rc == 0); // test_destroy_queue_on_disconnect (ctx);
return 0; return UNITY_END ();
} }
...@@ -217,6 +217,24 @@ void *test_context_socket_close (void *socket) ...@@ -217,6 +217,24 @@ void *test_context_socket_close (void *socket)
return socket; return socket;
} }
void *test_context_socket_close_zero_linger (void *socket)
{
const int linger = 0;
int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof (linger));
TEST_ASSERT_TRUE (rc == 0 || zmq_errno () == ETERM);
return test_context_socket_close (socket);
}
void test_bind (void *socket,
const char *bind_address,
char *my_endpoint,
size_t len)
{
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, bind_address));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (socket, ZMQ_LAST_ENDPOINT, my_endpoint, &len));
}
void bind_loopback (void *socket, int ipv6, char *my_endpoint, size_t len) void bind_loopback (void *socket, int ipv6, char *my_endpoint, size_t len)
{ {
if (ipv6 && !is_ipv6_available ()) { if (ipv6 && !is_ipv6_available ()) {
...@@ -225,10 +243,9 @@ void bind_loopback (void *socket, int ipv6, char *my_endpoint, size_t len) ...@@ -225,10 +243,9 @@ void bind_loopback (void *socket, int ipv6, char *my_endpoint, size_t len)
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (socket, ZMQ_IPV6, &ipv6, sizeof (int))); zmq_setsockopt (socket, ZMQ_IPV6, &ipv6, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_bind (socket, ipv6 ? "tcp://[::1]:*" : "tcp://127.0.0.1:*")); test_bind (socket, ipv6 ? "tcp://[::1]:*" : "tcp://127.0.0.1:*",
TEST_ASSERT_SUCCESS_ERRNO ( my_endpoint, len);
zmq_getsockopt (socket, ZMQ_LAST_ENDPOINT, my_endpoint, &len));
} }
void bind_loopback_ipv4 (void *socket, char *my_endpoint, size_t len) void bind_loopback_ipv4 (void *socket, char *my_endpoint, size_t len)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment