Commit fa467d34 authored by Simon Giesecke's avatar Simon Giesecke

Problem: test_hwm_pubsub not using test framework

Solution: migrate to unity
parent c178193c
......@@ -470,7 +470,8 @@ tests_test_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_hwm_pubsub_SOURCES = tests/test_hwm_pubsub.cpp
tests_test_hwm_pubsub_LDADD = src/libzmq.la
tests_test_hwm_pubsub_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_hwm_pubsub_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_device_SOURCES = tests/test_reqrep_device.cpp
tests_test_reqrep_device_LDADD = src/libzmq.la
......
......@@ -28,30 +28,35 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
// const int MAX_SENDS = 10000;
int test_defaults (int send_hwm, int msgCnt)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Set up bind socket
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
assert (pub_socket);
rc = zmq_bind (pub_socket, "inproc://a");
assert (rc == 0);
void *pub_socket = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a"));
// Set up connect socket
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
assert (sub_socket);
rc = zmq_connect (sub_socket, "inproc://a");
assert (rc == 0);
void *sub_socket = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a"));
//set a hwm on publisher
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
// Send until we block
int send_count = 0;
......@@ -67,17 +72,11 @@ int test_defaults (int send_hwm, int msgCnt)
++recv_count;
}
assert (send_hwm == recv_count);
TEST_ASSERT_EQUAL_INT (send_hwm, recv_count);
// Clean up
rc = zmq_close (sub_socket);
assert (rc == 0);
rc = zmq_close (pub_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sub_socket);
test_context_socket_close (pub_socket);
return recv_count;
}
......@@ -96,89 +95,68 @@ int receive (void *socket)
int test_blocking (int send_hwm, int msgCnt)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Set up bind socket
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
assert (pub_socket);
rc = zmq_bind (pub_socket, "inproc://a");
assert (rc == 0);
void *pub_socket = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a"));
// Set up connect socket
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
assert (sub_socket);
rc = zmq_connect (sub_socket, "inproc://a");
assert (rc == 0);
void *sub_socket = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a"));
//set a hwm on publisher
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
int wait = 1;
rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait));
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
// Send until we block
int send_count = 0;
int recv_count = 0;
while (send_count < msgCnt) {
rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
if (rc == 0) {
++send_count;
} else if (-1 == rc) {
assert (EAGAIN == errno);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
recv_count += receive (sub_socket);
assert (recv_count == send_count);
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
}
}
recv_count += receive (sub_socket);
// Clean up
rc = zmq_close (sub_socket);
assert (rc == 0);
rc = zmq_close (pub_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sub_socket);
test_context_socket_close (pub_socket);
return recv_count;
}
// hwm should apply to the messages that have already been received
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
void test_reset_hwm ()
{
int first_count = 9999;
int second_count = 1100;
const int first_count = 9999;
const int second_count = 1100;
int hwm = 11024;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Set up bind socket
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
assert (pub_socket);
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *pub_socket = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
bind_loopback_ipv4 (pub_socket, my_endpoint, MAX_SOCKET_STRING);
// Set up connect socket
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
assert (sub_socket);
rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_connect (sub_socket, my_endpoint);
assert (rc == 0);
rc = zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0);
assert (rc == 0);
void *sub_socket = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, my_endpoint));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
msleep (SETTLE_TIME);
......@@ -187,7 +165,7 @@ void test_reset_hwm ()
while (send_count < first_count
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
assert (first_count == send_count);
TEST_ASSERT_EQUAL_INT (first_count, send_count);
msleep (SETTLE_TIME);
......@@ -196,7 +174,7 @@ void test_reset_hwm ()
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
++recv_count;
}
assert (first_count == recv_count);
TEST_ASSERT_EQUAL_INT (first_count, recv_count);
msleep (SETTLE_TIME);
......@@ -205,7 +183,7 @@ void test_reset_hwm ()
while (send_count < second_count
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
assert (second_count == send_count);
TEST_ASSERT_EQUAL_INT (second_count, send_count);
msleep (SETTLE_TIME);
......@@ -214,35 +192,32 @@ void test_reset_hwm ()
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
++recv_count;
}
assert (second_count == recv_count);
TEST_ASSERT_EQUAL_INT (second_count, recv_count);
// Clean up
rc = zmq_close (sub_socket);
assert (rc == 0);
rc = zmq_close (pub_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (sub_socket);
test_context_socket_close (pub_socket);
}
int main (void)
void test_defaults_1000 ()
{
setup_test_environment ();
int count;
// send 1000 msg on hwm 1000, receive 1000
count = test_defaults (1000, 1000);
assert (count == 1000);
TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000));
}
void test_blocking_2000 ()
{
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm
count = test_blocking (2000, 6000);
assert (count == 6000);
TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000));
}
// hwm should apply to the messages that have already been received
test_reset_hwm ();
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_defaults_1000);
RUN_TEST (test_blocking_2000);
RUN_TEST (test_reset_hwm);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment