Unverified Commit e0fe7f10 authored by Constantin Rack's avatar Constantin Rack Committed by GitHub

Merge pull request #3232 from sigiesec/migrate-tests-to-unity

Migrate tests to unity, reduce duplication
parents 76f2edd0 430b97fc
......@@ -533,7 +533,8 @@ tests_test_router_mandatory_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_router_mandatory_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_router_mandatory_hwm_SOURCES = tests/test_router_mandatory_hwm.cpp
tests_test_router_mandatory_hwm_LDADD = src/libzmq.la
tests_test_router_mandatory_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_router_mandatory_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_router_handover_SOURCES = tests/test_router_handover.cpp
tests_test_router_handover_LDADD = src/libzmq.la ${UNITY_LIBS}
......@@ -552,10 +553,12 @@ tests_test_stream_empty_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_stream_empty_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp
tests_test_stream_timeout_LDADD = src/libzmq.la
tests_test_stream_timeout_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_stream_timeout_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp
tests_test_stream_disconnect_LDADD = src/libzmq.la
tests_test_stream_disconnect_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_stream_disconnect_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp
tests_test_disconnect_inproc_LDADD = src/libzmq.la
......@@ -623,7 +626,8 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
tests_test_issue_566_LDADD = src/libzmq.la
tests_test_proxy_SOURCES = tests/test_proxy.cpp
tests_test_proxy_LDADD = src/libzmq.la
tests_test_proxy_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_proxy_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD = src/libzmq.la
......@@ -760,8 +764,7 @@ test_apps += \
tests/test_pair_ipc \
tests/test_rebind_ipc \
tests/test_reqrep_ipc \
tests/test_use_fd_ipc \
tests/test_use_fd_tcp \
tests/test_use_fd \
tests/test_zmq_poll_fd \
tests/test_timeo \
tests/test_filter_ipc
......@@ -792,15 +795,11 @@ tests_test_filter_ipc_SOURCES = tests/test_filter_ipc.cpp
tests_test_filter_ipc_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_filter_ipc_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_use_fd_ipc_SOURCES = \
tests/test_use_fd_ipc.cpp \
tests_test_use_fd_SOURCES = \
tests/test_use_fd.cpp \
tests/testutil.hpp
tests_test_use_fd_ipc_LDADD = src/libzmq.la
tests_test_use_fd_tcp_SOURCES = \
tests/test_use_fd_tcp.cpp \
tests/testutil.hpp
tests_test_use_fd_tcp_LDADD = src/libzmq.la
tests_test_use_fd_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_use_fd_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_zmq_poll_fd_SOURCES = tests/test_zmq_poll_fd.cpp
tests_test_zmq_poll_fd_LDADD = src/libzmq.la
......@@ -916,7 +915,8 @@ tests_test_thread_safe_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_thread_safe_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_timers_SOURCES = tests/test_timers.cpp
tests_test_timers_LDADD = src/libzmq.la
tests_test_timers_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_timers_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp
tests_test_radio_dish_LDADD = src/libzmq.la ${UNITY_LIBS}
......
......@@ -90,8 +90,7 @@ if(NOT WIN32)
test_filter_ipc
test_stream_exceeds_buffer
test_router_mandatory_hwm
test_use_fd_ipc
test_use_fd_tcp
test_use_fd
test_zmq_poll_fd
)
if(HAVE_FORK)
......
......@@ -28,21 +28,32 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
void setUp ()
{
setup_test_context ();
}
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
void tearDown ()
{
teardown_test_context ();
}
#define CONTENT_SIZE 13
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 12
#define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
......@@ -52,7 +63,6 @@
struct thread_data
{
void *ctx;
int id;
};
......@@ -76,46 +86,38 @@ void *g_workers_pkts_out = NULL;
static void client_task (void *db_)
{
struct thread_data *databag = (struct thread_data *) db_;
struct thread_data *databag = static_cast<struct thread_data *> (db_);
// Endpoint socket gets random port to avoid test failing when port in use
void *endpoint = zmq_socket (databag->ctx, ZMQ_PAIR);
assert (endpoint);
void *endpoint = test_context_socket (ZMQ_PAIR);
int linger = 0;
int rc = zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
char endpoint_source[256];
sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
rc = zmq_connect (endpoint, endpoint_source);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
char *my_endpoint = s_recv (endpoint);
assert (my_endpoint);
TEST_ASSERT_NOT_NULL (my_endpoint);
void *client = zmq_socket (databag->ctx, ZMQ_DEALER);
assert (client);
void *client = test_context_socket (ZMQ_DEALER);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (databag->ctx, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
void *control = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
char content[CONTENT_SIZE_MAX];
// Set random routing id to make tracing easier
char routing_id[ROUTING_ID_SIZE];
sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
rc =
zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id,
ROUTING_ID_SIZE); // includes '\0' as an helper for printf
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
client, ZMQ_ROUTING_ID, routing_id,
ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
linger = 0;
rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
{control, 0, ZMQ_POLLIN, 0}};
......@@ -123,27 +125,27 @@ static void client_task (void *db_)
bool run = true;
bool keep_sending = true;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 20; centitick++) {
for (int centitick = 0; centitick < 20; centitick++) {
zmq_poll (items, 2, 10);
if (items[0].revents & ZMQ_POLLIN) {
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
TEST_ASSERT_EQUAL_INT (
CONTENT_SIZE, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (
client, content, CONTENT_SIZE_MAX, 0)));
if (is_verbose)
printf (
"client receive - routing_id = %s content = %s\n",
routing_id, content);
// Check that message is still the same
assert (memcmp (content, "request #", 9) == 0);
rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
TEST_ASSERT_FALSE (rcvmore);
}
if (items[1].revents & ZMQ_POLLIN) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
int rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (control, content, CONTENT_SIZE_MAX, 0));
if (rc > 0) {
content[rc] = 0; // NULL-terminate the command string
......@@ -170,17 +172,13 @@ static void client_task (void *db_)
routing_id, request_nbr);
zmq_atomic_counter_inc (g_clients_pkts_out);
rc = zmq_send (client, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
send_string_expect_success (client, content, 0);
}
}
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
rc = zmq_close (endpoint);
assert (rc == 0);
test_context_socket_close (client);
test_context_socket_close (control);
test_context_socket_close (endpoint);
free (my_endpoint);
}
......@@ -190,80 +188,64 @@ static void client_task (void *db_)
// one request at a time but one client can talk to multiple workers at
// once.
static void server_worker (void *ctx_);
static void server_worker (void * /*dummy_*/);
void server_task (void *ctx_)
void server_task (void * /*dummy_*/)
{
// Frontend socket talks to clients over TCP
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *frontend = zmq_socket (ctx_, ZMQ_ROUTER);
assert (frontend);
void *frontend = test_context_socket (ZMQ_ROUTER);
int linger = 0;
int rc = zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (frontend, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (frontend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
// Backend socket talks to workers over inproc
void *backend = zmq_socket (ctx_, ZMQ_DEALER);
assert (backend);
rc = zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (backend, "inproc://backend");
assert (rc == 0);
void *backend = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx_, ZMQ_REP);
assert (control);
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (control, "inproc://control_proxy");
assert (rc == 0);
void *control = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control_proxy"));
// Launch pool of worker threads, precise number is not critical
int thread_nbr;
void *threads[5];
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx_);
threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
// Endpoint socket sends random port to avoid test failing when port in use
void *endpoint_receivers[QT_CLIENTS];
char endpoint_source[256];
for (int i = 0; i < QT_CLIENTS; ++i) {
endpoint_receivers[i] = zmq_socket (ctx_, ZMQ_PAIR);
assert (endpoint_receivers[i]);
rc = zmq_setsockopt (endpoint_receivers[i], ZMQ_LINGER, &linger,
sizeof (linger));
assert (rc == 0);
endpoint_receivers[i] = test_context_socket (ZMQ_PAIR);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
sprintf (endpoint_source, "inproc://endpoint%d", i);
rc = zmq_bind (endpoint_receivers[i], endpoint_source);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_bind (endpoint_receivers[i], endpoint_source));
}
for (int i = 0; i < QT_CLIENTS; ++i) {
rc = s_send (endpoint_receivers[i], my_endpoint);
assert (rc > 0);
TEST_ASSERT_SUCCESS_ERRNO (s_send (endpoint_receivers[i], my_endpoint));
}
// Connect backend to frontend via a proxy
rc = zmq_proxy_steerable (frontend, backend, NULL, control);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_proxy_steerable (frontend, backend, NULL, control));
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
zmq_threadclose (threads[thread_nbr]);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
test_context_socket_close (frontend);
test_context_socket_close (backend);
test_context_socket_close (control);
for (int i = 0; i < QT_CLIENTS; ++i) {
rc = zmq_close (endpoint_receivers[i]);
assert (rc == 0);
test_context_socket_close (endpoint_receivers[i]);
}
}
......@@ -271,25 +253,20 @@ void server_task (void *ctx_)
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
static void server_worker (void *ctx_)
static void server_worker (void * /*dummy_*/)
{
void *worker = zmq_socket (ctx_, ZMQ_DEALER);
assert (worker);
void *worker = test_context_socket (ZMQ_DEALER);
int linger = 0;
int rc = zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (worker, "inproc://backend");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx_, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
void *control = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
char content[CONTENT_SIZE_MAX]; // bigger than what we need to check that
char routing_id[ROUTING_ID_SIZE_MAX]; // the size received is the size sent
......@@ -297,7 +274,7 @@ static void server_worker (void *ctx_)
bool run = true;
bool keep_sending = true;
while (run) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
int rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) {
content[rc] = 0; // NULL-terminate the command string
......@@ -312,16 +289,17 @@ static void server_worker (void *ctx_)
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
if (rc == ROUTING_ID_SIZE) {
rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
TEST_ASSERT_EQUAL_INT (
CONTENT_SIZE, TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (worker, content, CONTENT_SIZE_MAX, 0)));
if (is_verbose)
printf ("server receive - routing_id = %s content = %s\n",
routing_id, content);
// Send 0..4 replies back
if (keep_sending) {
int reply, replies = rand () % 5;
for (reply = 0; reply < replies; reply++) {
const int replies = rand () % 5;
for (int reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
msleep (rand () % 10 + 1);
......@@ -331,19 +309,15 @@ static void server_worker (void *ctx_)
routing_id);
zmq_atomic_counter_inc (g_workers_pkts_out);
rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
send_string_expect_success (worker, routing_id,
ZMQ_SNDMORE);
assert (rc == ROUTING_ID_SIZE);
rc = zmq_send (worker, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
send_string_expect_success (worker, content, 0);
}
}
}
}
rc = zmq_close (worker);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
test_context_socket_close (worker);
test_context_socket_close (control);
}
uint64_t recv_stat (void *sock_, bool last_)
......@@ -351,19 +325,18 @@ uint64_t recv_stat (void *sock_, bool last_)
uint64_t res;
zmq_msg_t stats_msg;
int rc = zmq_msg_init (&stats_msg);
assert (rc == 0);
rc = zmq_recvmsg (sock_, &stats_msg, 0);
assert (rc == sizeof (uint64_t));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_EQUAL_INT (
sizeof (uint64_t),
TEST_ASSERT_SUCCESS_ERRNO (zmq_recvmsg (sock_, &stats_msg, 0)));
memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
int more;
size_t moresz = sizeof more;
rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0);
assert ((last_ && !more) || (!last_ && more));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz));
TEST_ASSERT_TRUE ((last_ && !more) || (!last_ && more));
return res;
}
......@@ -373,10 +346,8 @@ uint64_t recv_stat (void *sock_, bool last_)
void check_proxy_stats (void *control_proxy_)
{
zmq_proxy_stats_t total_stats;
int rc;
rc = zmq_send (control_proxy_, "STATISTICS", 10, 0);
assert (rc == 10);
send_string_expect_success (control_proxy_, "STATISTICS", 0);
// first frame of the reply contains FRONTEND stats:
total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
......@@ -411,67 +382,54 @@ void check_proxy_stats (void *control_proxy_)
printf ("workers sent out %d replies\n",
zmq_atomic_counter_value (g_workers_pkts_out));
}
assert (total_stats.frontend.msg_in
== (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
assert (total_stats.frontend.msg_out
== (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
assert (total_stats.backend.msg_in
== (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
assert (total_stats.backend.msg_out
== (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
TEST_ASSERT_EQUAL_UINT64 (
(unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
total_stats.frontend.msg_in);
TEST_ASSERT_EQUAL_UINT64 (
(unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
total_stats.frontend.msg_out);
TEST_ASSERT_EQUAL_UINT64 (
(unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
total_stats.backend.msg_in);
TEST_ASSERT_EQUAL_UINT64 (
(unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
total_stats.backend.msg_out);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void)
void test_proxy ()
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
g_clients_pkts_out = zmq_atomic_counter_new ();
g_workers_pkts_out = zmq_atomic_counter_new ();
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
void *control = test_context_socket (ZMQ_PUB);
int linger = 0;
int rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
// Control socket receives terminate command from main over inproc
void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
assert (control_proxy);
rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (control_proxy, "inproc://control_proxy");
assert (rc == 0);
void *control_proxy = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_bind (control_proxy, "inproc://control_proxy"));
void *threads[QT_CLIENTS + 1];
struct thread_data databags[QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) {
databags[i].ctx = ctx;
databags[i].id = i;
threads[i] = zmq_threadstart (&client_task, &databags[i]);
}
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL);
msleep (500); // Run for 500 ms then quit
if (is_verbose)
printf ("stopping all clients and server workers\n");
rc = zmq_send (control, "STOP", 4, 0);
assert (rc == 4);
send_string_expect_success (control, "STOP", 0);
msleep (500); // Wait for all clients and workers to STOP
#ifdef ZMQ_BUILD_DRAFT_API
if (is_verbose)
printf ("retrieving stats from the proxy\n");
......@@ -480,24 +438,27 @@ int main (void)
if (is_verbose)
printf ("shutting down all clients and server workers\n");
rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9);
send_string_expect_success (control, "TERMINATE", 0);
if (is_verbose)
printf ("shutting down the proxy\n");
rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
assert (rc == 9);
send_string_expect_success (control_proxy, "TERMINATE", 0);
rc = zmq_close (control);
assert (rc == 0);
rc = zmq_close (control_proxy);
assert (rc == 0);
test_context_socket_close (control);
test_context_socket_close (control_proxy);
for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void)
{
setup_test_environment ();
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
UNITY_BEGIN ();
RUN_TEST (test_proxy);
return UNITY_END ();
}
......@@ -28,6 +28,17 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
// DEBUG shouldn't be defined in sources as it will cause a redefined symbol
// error when it is defined in the build configuration. It appears that the
......@@ -36,101 +47,82 @@
//#define DEBUG 0
#define TRACE_ENABLED 0
int main (void)
void test_router_mandatory_hwm ()
{
int rc;
if (TRACE_ENABLED)
fprintf (stderr, "Staring router mandatory HWM test ...\n");
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
void *router = test_context_socket (ZMQ_ROUTER);
// Configure router socket to mandatory routing and set HWM and linger
int mandatory = 1;
rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory,
sizeof (mandatory));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY,
&mandatory, sizeof (mandatory)));
int sndhwm = 1;
rc = zmq_setsockopt (router, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (router, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm)));
int linger = 1;
rc = zmq_setsockopt (router, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (router, ZMQ_LINGER, &linger, sizeof (linger)));
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint);
// Create dealer called "X" and connect it to our router, configure HWM
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1));
int rcvhwm = 1;
rc = zmq_setsockopt (dealer, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm)));
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
// Get message from dealer to know when connection is ready
char buffer[255];
rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 1);
assert (buffer[0] == 'X');
send_string_expect_success (dealer, "Hello", 0);
recv_string_expect_success (router, "X", 0);
int i;
const int buf_size = 65536;
char buf[buf_size];
memset (buf, 0, buf_size);
const uint8_t buf[buf_size] = {0};
// Send first batch of messages
for (i = 0; i < 100000; ++i) {
if (TRACE_ENABLED)
fprintf (stderr, "Sending message %d ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
const int rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno () == EAGAIN)
break;
assert (rc == 1);
rc = zmq_send (router, buf, buf_size, ZMQ_DONTWAIT);
assert (rc == buf_size);
TEST_ASSERT_EQUAL_INT (1, rc);
send_array_expect_success (router, buf, ZMQ_DONTWAIT);
}
// This should fail after one message but kernel buffering could
// skew results
assert (i < 10);
TEST_ASSERT_LESS_THAN_INT (10, i);
msleep (1000);
// Send second batch of messages
for (; i < 100000; ++i) {
if (TRACE_ENABLED)
fprintf (stderr, "Sending message %d (part 2) ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
const int rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno () == EAGAIN)
break;
assert (rc == 1);
rc = zmq_send (router, buf, buf_size, ZMQ_DONTWAIT);
assert (rc == buf_size);
TEST_ASSERT_EQUAL_INT (1, rc);
send_array_expect_success (router, buf, ZMQ_DONTWAIT);
}
// This should fail after two messages but kernel buffering could
// skew results
assert (i < 20);
TEST_ASSERT_LESS_THAN_INT (20, i);
if (TRACE_ENABLED)
fprintf (stderr, "Done sending messages.\n");
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
test_context_socket_close (router);
test_context_socket_close (dealer);
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_router_mandatory_hwm);
return UNITY_END ();
}
......@@ -28,6 +28,17 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
static const int SERVER = 0;
static const int CLIENT = 1;
......@@ -56,33 +67,20 @@ bool has_more (void *socket_)
return more != 0;
}
bool get_routing_id (void *socket_, char *data_, size_t *size_)
{
int rc = zmq_getsockopt (socket_, ZMQ_ROUTING_ID, data_, size_);
return rc == 0;
}
int main (int, char **)
void test_stream_disconnect ()
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char bind_endpoint[MAX_SOCKET_STRING];
char connect_endpoint[MAX_SOCKET_STRING];
void *context = zmq_ctx_new ();
void *sockets[2];
int rc = 0;
sockets[SERVER] = zmq_socket (context, ZMQ_STREAM);
sockets[SERVER] = test_context_socket (ZMQ_STREAM);
int enabled = 1;
rc = zmq_setsockopt (sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled,
sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*");
assert (rc == 0);
rc =
zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len));
// Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
......@@ -92,76 +90,58 @@ int main (int, char **)
strcpy (connect_endpoint, bind_endpoint);
#endif
sockets[CLIENT] = zmq_socket (context, ZMQ_STREAM);
rc = zmq_setsockopt (sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled,
sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (sockets[CLIENT], connect_endpoint);
assert (rc == 0);
sockets[CLIENT] = test_context_socket (ZMQ_STREAM);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sockets[CLIENT], connect_endpoint));
// wait for connect notification
// Server: Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets[SERVER], 0);
assert (rc != -1);
assert (zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets[SERVER]));
rc = zmq_msg_close (&peer_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
// Server: Grab the 2nd frame (actual payload).
zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets[SERVER], 0);
assert (rc != -1);
assert (zmq_msg_size (&data_frame) == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[SERVER], 0));
TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
// Client: Grab the 1st frame (peer routing id).
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets[CLIENT], 0);
assert (rc != -1);
assert (zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets[CLIENT]));
rc = zmq_msg_close (&peer_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
// Client: Grab the 2nd frame (actual payload).
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets[CLIENT], 0);
assert (rc != -1);
assert (zmq_msg_size (&data_frame) == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
// Send initial message.
char blob_data[256];
size_t blob_size = sizeof (blob_data);
rc =
zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size);
assert (rc != -1);
assert (blob_size > 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size));
TEST_ASSERT_GREATER_THAN (0, blob_size);
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, blob_size);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, blob_size));
memcpy (zmq_msg_data (&msg), blob_data, blob_size);
rc = zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_msg_init_size (&msg, strlen (dialog[0].text));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_init_size (&msg, strlen (dialog[0].text)));
memcpy (zmq_msg_data (&msg), dialog[0].text, strlen (dialog[0].text));
rc = zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_close (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
// TODO: make sure this loop doesn't loop forever if something is wrong
// with the test (or the implementation).
......@@ -173,28 +153,25 @@ int main (int, char **)
{sockets[SERVER], 0, ZMQ_POLLIN, 0},
{sockets[CLIENT], 0, ZMQ_POLLIN, 0},
};
int rc = zmq_poll (items, 2, 100);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (items, 2, 100));
// Check for data received by the server.
if (items[SERVER].revents & ZMQ_POLLIN) {
assert (dialog[step].turn == CLIENT);
TEST_ASSERT_EQUAL_INT (CLIENT, dialog[step].turn);
// Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets[SERVER], 0);
assert (rc != -1);
assert (zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets[SERVER]));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
// Grab the 2nd frame (actual payload).
zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets[SERVER], 0);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_recv (&data_frame, sockets[SERVER], 0));
// Make sure payload matches what we expect.
const char *const data = (const char *) zmq_msg_data (&data_frame);
......@@ -203,97 +180,90 @@ int main (int, char **)
// should receive it as the last step in the dialogue.
if (size == 0) {
++step;
assert (step == steps);
TEST_ASSERT_EQUAL_INT (steps, step);
} else {
assert (size == strlen (dialog[step].text));
int cmp = memcmp (dialog[step].text, data, size);
assert (cmp == 0);
TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
++step;
assert (step < steps);
TEST_ASSERT_LESS_THAN_INT (steps, step);
// Prepare the response.
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
rc =
zmq_msg_init_size (&data_frame, strlen (dialog[step].text));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
memcpy (zmq_msg_data (&data_frame), dialog[step].text,
zmq_msg_size (&data_frame));
// Send the response.
rc = zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE));
}
// Release resources.
rc = zmq_msg_close (&peer_frame);
assert (rc == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
}
// Check for data received by the client.
if (items[CLIENT].revents & ZMQ_POLLIN) {
assert (dialog[step].turn == SERVER);
TEST_ASSERT_EQUAL_INT (SERVER, dialog[step].turn);
// Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets[CLIENT], 0);
assert (rc != -1);
assert (zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets[CLIENT]));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
// Grab the 2nd frame (actual payload).
zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets[CLIENT], 0);
assert (rc != -1);
assert (zmq_msg_size (&data_frame) > 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&data_frame));
// Make sure payload matches what we expect.
const char *const data = (const char *) zmq_msg_data (&data_frame);
const size_t size = zmq_msg_size (&data_frame);
assert (size == strlen (dialog[step].text));
int cmp = memcmp (dialog[step].text, data, size);
assert (cmp == 0);
TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
++step;
// Prepare the response (next line in the dialog).
assert (step < steps);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
rc = zmq_msg_init_size (&data_frame, strlen (dialog[step].text));
assert (rc == 0);
TEST_ASSERT_LESS_THAN_INT (steps, step);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
memcpy (zmq_msg_data (&data_frame), dialog[step].text,
zmq_msg_size (&data_frame));
// Send the response.
rc = zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE));
// Release resources.
rc = zmq_msg_close (&peer_frame);
assert (rc == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
}
}
assert (step == steps);
rc = zmq_close (sockets[CLIENT]);
assert (rc == 0);
rc = zmq_close (sockets[SERVER]);
assert (rc == 0);
rc = zmq_ctx_term (context);
assert (rc == 0);
return 0;
TEST_ASSERT_EQUAL_INT (steps, step);
test_context_socket_close (sockets[CLIENT]);
test_context_socket_close (sockets[SERVER]);
}
int main (int, char **)
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_stream_disconnect);
return UNITY_END ();
}
......@@ -28,6 +28,17 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
......@@ -40,7 +51,7 @@ static int get_monitor_event (void *monitor_, int *value_, char **address_)
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
return -1; // Interruped, presumably
assert (zmq_msg_more (&msg));
TEST_ASSERT_TRUE (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
......@@ -51,7 +62,7 @@ static int get_monitor_event (void *monitor_, int *value_, char **address_)
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, 0) == -1)
return -1; // Interruped, presumably
assert (!zmq_msg_more (&msg));
TEST_ASSERT_TRUE (!zmq_msg_more (&msg));
if (address_) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
......@@ -63,172 +74,132 @@ static int get_monitor_event (void *monitor_, int *value_, char **address_)
return event;
}
static void test_stream_handshake_timeout_accept (void)
static void test_stream_handshake_timeout_accept ()
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
// We use this socket in raw mode, to make a connection and send nothing
void *stream = zmq_socket (ctx, ZMQ_STREAM);
assert (stream);
void *stream = test_context_socket (ZMQ_STREAM);
int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
// We'll be using this socket to test TCP stream handshake timeout
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
int val, tenth = 100;
size_t vsize = sizeof (val);
// check for the expected default handshake timeout value - 30 sec
rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
assert (rc == 0);
assert (vsize == sizeof (val));
assert (val == 30000);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize));
TEST_ASSERT_EQUAL (sizeof (val), vsize);
TEST_ASSERT_EQUAL_INT (30000, val);
// make handshake timeout faster - 1/10 sec
rc = zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth)));
vsize = sizeof (val);
// make sure zmq_setsockopt changed the value
rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
assert (rc == 0);
assert (vsize == sizeof (val));
assert (val == tenth);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize));
TEST_ASSERT_EQUAL (sizeof (val), vsize);
TEST_ASSERT_EQUAL_INT (tenth, val);
// Create and connect a socket for collecting monitor events on dealer
void *dealer_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (dealer_mon);
void *dealer_mon = test_context_socket (ZMQ_PAIR);
rc = zmq_socket_monitor (dealer, "inproc://monitor-dealer",
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED
| ZMQ_EVENT_ACCEPTED);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (
dealer, "inproc://monitor-dealer",
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED));
// Connect to the inproc endpoint so we'll get events
rc = zmq_connect (dealer_mon, "inproc://monitor-dealer");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (dealer_mon, "inproc://monitor-dealer"));
// bind dealer socket to accept connection from non-sending stream socket
rc = zmq_bind (dealer, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
bind_loopback_ipv4 (dealer, my_endpoint, sizeof my_endpoint);
rc = zmq_connect (stream, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (stream, my_endpoint));
// we should get ZMQ_EVENT_ACCEPTED and then ZMQ_EVENT_DISCONNECTED
int event = get_monitor_event (dealer_mon, NULL, NULL);
assert (event == ZMQ_EVENT_ACCEPTED);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, event);
event = get_monitor_event (dealer_mon, NULL, NULL);
assert (event == ZMQ_EVENT_DISCONNECTED);
rc = zmq_close (dealer);
assert (rc == 0);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, event);
rc = zmq_close (dealer_mon);
assert (rc == 0);
rc = zmq_close (stream);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (dealer);
test_context_socket_close (dealer_mon);
test_context_socket_close (stream);
}
static void test_stream_handshake_timeout_connect (void)
static void test_stream_handshake_timeout_connect ()
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
// We use this socket in raw mode, to accept a connection and send nothing
void *stream = zmq_socket (ctx, ZMQ_STREAM);
assert (stream);
void *stream = test_context_socket (ZMQ_STREAM);
int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint);
// We'll be using this socket to test TCP stream handshake timeout
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
int val, tenth = 100;
size_t vsize = sizeof (val);
// check for the expected default handshake timeout value - 30 sec
rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
assert (rc == 0);
assert (vsize == sizeof (val));
assert (val == 30000);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize));
TEST_ASSERT_EQUAL (sizeof (val), vsize);
TEST_ASSERT_EQUAL_INT (30000, val);
// make handshake timeout faster - 1/10 sec
rc = zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth)));
vsize = sizeof (val);
// make sure zmq_setsockopt changed the value
rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
assert (rc == 0);
assert (vsize == sizeof (val));
assert (val == tenth);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize));
TEST_ASSERT_EQUAL (sizeof (val), vsize);
TEST_ASSERT_EQUAL_INT (tenth, val);
// Create and connect a socket for collecting monitor events on dealer
void *dealer_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (dealer_mon);
void *dealer_mon = test_context_socket (ZMQ_PAIR);
rc = zmq_socket_monitor (dealer, "inproc://monitor-dealer",
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED
| ZMQ_EVENT_ACCEPTED);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (
dealer, "inproc://monitor-dealer",
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED));
// Connect to the inproc endpoint so we'll get events
rc = zmq_connect (dealer_mon, "inproc://monitor-dealer");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (dealer_mon, "inproc://monitor-dealer"));
// connect dealer socket to non-sending stream socket
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
// we should get ZMQ_EVENT_CONNECTED and then ZMQ_EVENT_DISCONNECTED
int event = get_monitor_event (dealer_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CONNECTED, event);
event = get_monitor_event (dealer_mon, NULL, NULL);
assert (event == ZMQ_EVENT_DISCONNECTED);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, event);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_close (dealer_mon);
assert (rc == 0);
rc = zmq_close (stream);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (dealer);
test_context_socket_close (dealer_mon);
test_context_socket_close (stream);
}
int main (void)
int main ()
{
setup_test_environment ();
test_stream_handshake_timeout_accept ();
test_stream_handshake_timeout_connect ();
UNITY_BEGIN ();
RUN_TEST (test_stream_handshake_timeout_accept);
RUN_TEST (test_stream_handshake_timeout_connect);
return UNITY_END ();
}
......@@ -29,6 +29,15 @@
#define __STDC_LIMIT_MACROS // to define SIZE_MAX with older compilers
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
}
void tearDown ()
{
}
void handler (int timer_id_, void *arg_)
{
......@@ -53,193 +62,167 @@ void test_null_timer_pointers ()
{
void *timers = NULL;
int rc = zmq_timers_destroy (&timers);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_timers_destroy (&timers));
// TODO this currently triggers an access violation
#if 0
rc = zmq_timers_destroy (NULL);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO(EFAULT, zmq_timers_destroy (NULL));
#endif
const size_t dummy_interval = 100;
const int dummy_timer_id = 1;
rc = zmq_timers_add (timers, dummy_interval, &handler, NULL);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_add (&timers, dummy_interval, &handler, NULL);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_cancel (timers, dummy_timer_id);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_cancel (&timers, dummy_timer_id);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (
EFAULT, zmq_timers_add (timers, dummy_interval, &handler, NULL));
TEST_ASSERT_FAILURE_ERRNO (
EFAULT, zmq_timers_add (&timers, dummy_interval, &handler, NULL));
rc = zmq_timers_set_interval (timers, dummy_timer_id, dummy_interval);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (EFAULT,
zmq_timers_cancel (timers, dummy_timer_id));
TEST_ASSERT_FAILURE_ERRNO (EFAULT,
zmq_timers_cancel (&timers, dummy_timer_id));
rc = zmq_timers_set_interval (&timers, dummy_timer_id, dummy_interval);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (
EFAULT, zmq_timers_set_interval (timers, dummy_timer_id, dummy_interval));
TEST_ASSERT_FAILURE_ERRNO (
EFAULT,
zmq_timers_set_interval (&timers, dummy_timer_id, dummy_interval));
rc = zmq_timers_reset (timers, dummy_timer_id);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (EFAULT,
zmq_timers_reset (timers, dummy_timer_id));
TEST_ASSERT_FAILURE_ERRNO (EFAULT,
zmq_timers_reset (&timers, dummy_timer_id));
rc = zmq_timers_reset (&timers, dummy_timer_id);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_timers_timeout (timers));
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_timers_timeout (&timers));
rc = zmq_timers_timeout (timers);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_timeout (&timers);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_execute (timers);
assert (rc == -1 && errno == EFAULT);
rc = zmq_timers_execute (&timers);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_timers_execute (timers));
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_timers_execute (&timers));
}
void test_corner_cases ()
{
void *timers = zmq_timers_new ();
assert (timers);
TEST_ASSERT_NOT_NULL (timers);
const size_t dummy_interval = SIZE_MAX;
const int dummy_timer_id = 1;
// attempt to cancel non-existent timer
int rc = zmq_timers_cancel (timers, dummy_timer_id);
assert (rc == -1 && errno == EINVAL);
TEST_ASSERT_FAILURE_ERRNO (EINVAL,
zmq_timers_cancel (timers, dummy_timer_id));
// attempt to set interval of non-existent timer
rc = zmq_timers_set_interval (timers, dummy_timer_id, dummy_interval);
assert (rc == -1 && errno == EINVAL);
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_timers_set_interval (timers, dummy_timer_id, dummy_interval));
// attempt to reset non-existent timer
rc = zmq_timers_reset (timers, dummy_timer_id);
assert (rc == -1 && errno == EINVAL);
TEST_ASSERT_FAILURE_ERRNO (EINVAL,
zmq_timers_reset (timers, dummy_timer_id));
// attempt to add NULL handler
rc = zmq_timers_add (timers, dummy_interval, NULL, NULL);
assert (rc == -1 && errno == EFAULT);
TEST_ASSERT_FAILURE_ERRNO (
EFAULT, zmq_timers_add (timers, dummy_interval, NULL, NULL));
int timer_id = zmq_timers_add (timers, dummy_interval, handler, NULL);
assert (timer_id != -1);
const int timer_id = TEST_ASSERT_SUCCESS_ERRNO (
zmq_timers_add (timers, dummy_interval, handler, NULL));
// attempt to cancel timer twice
// TODO should this case really be an error? canceling twice could be allowed
rc = zmq_timers_cancel (timers, timer_id);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_cancel (timers, timer_id));
rc = zmq_timers_cancel (timers, timer_id);
assert (rc == -1 && errno == EINVAL);
TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_timers_cancel (timers, timer_id));
// timeout without any timers active
rc = zmq_timers_timeout (timers);
assert (rc == -1);
TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_timers_timeout (timers));
rc = zmq_timers_destroy (&timers);
assert (rc == 0);
// cleanup
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_destroy (&timers));
}
int main (void)
void test_timers ()
{
setup_test_environment ();
void *timers = zmq_timers_new ();
assert (timers);
TEST_ASSERT_NOT_NULL (timers);
bool timer_invoked = false;
const unsigned long full_timeout = 100;
void *const stopwatch = zmq_stopwatch_start ();
int timer_id =
zmq_timers_add (timers, full_timeout, handler, &timer_invoked);
assert (timer_id);
const int timer_id = TEST_ASSERT_SUCCESS_ERRNO (
zmq_timers_add (timers, full_timeout, handler, &timer_invoked));
// Timer should not have been invoked yet
int rc = zmq_timers_execute (timers);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_execute (timers));
#ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) {
assert (!timer_invoked);
TEST_ASSERT_FALSE (timer_invoked);
}
#endif
// Wait half the time and check again
long timeout = zmq_timers_timeout (timers);
assert (rc != -1);
long timeout = TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_timeout (timers));
msleep (timeout / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_execute (timers));
#ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) {
assert (!timer_invoked);
TEST_ASSERT_FALSE (timer_invoked);
}
#endif
// Wait until the end
rc = sleep_and_execute (timers);
assert (rc == 0);
assert (timer_invoked);
TEST_ASSERT_SUCCESS_ERRNO (sleep_and_execute (timers));
TEST_ASSERT_TRUE (timer_invoked);
timer_invoked = false;
// Wait half the time and check again
timeout = zmq_timers_timeout (timers);
assert (rc != -1);
timeout = TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_timeout (timers));
msleep (timeout / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_execute (timers));
#ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < 2 * full_timeout) {
assert (!timer_invoked);
TEST_ASSERT_FALSE (timer_invoked);
}
#endif
// Reset timer and wait half of the time left
rc = zmq_timers_reset (timers, timer_id);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_reset (timers, timer_id));
msleep (timeout / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_execute (timers));
if (zmq_stopwatch_stop (stopwatch) < 2 * full_timeout) {
assert (!timer_invoked);
TEST_ASSERT_FALSE (timer_invoked);
}
// Wait until the end
rc = sleep_and_execute (timers);
assert (rc == 0);
assert (timer_invoked);
TEST_ASSERT_SUCCESS_ERRNO (sleep_and_execute (timers));
TEST_ASSERT_TRUE (timer_invoked);
timer_invoked = false;
// reschedule
rc = zmq_timers_set_interval (timers, timer_id, 50);
assert (rc == 0);
rc = sleep_and_execute (timers);
assert (rc == 0);
assert (timer_invoked);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_set_interval (timers, timer_id, 50));
TEST_ASSERT_SUCCESS_ERRNO (sleep_and_execute (timers));
TEST_ASSERT_TRUE (timer_invoked);
timer_invoked = false;
// cancel timer
timeout = zmq_timers_timeout (timers);
assert (rc != -1);
rc = zmq_timers_cancel (timers, timer_id);
assert (rc == 0);
timeout = TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_timeout (timers));
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_cancel (timers, timer_id));
msleep (timeout * 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_execute (timers));
TEST_ASSERT_FALSE (timer_invoked);
rc = zmq_timers_destroy (&timers);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_timers_destroy (&timers));
}
test_null_timer_pointers ();
test_corner_cases ();
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_timers);
RUN_TEST (test_null_timer_pointers);
RUN_TEST (test_corner_cases);
return UNITY_END ();
}
......@@ -28,209 +28,261 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#if !defined(ZMQ_HAVE_WINDOWS)
#include <netdb.h>
#include <unity.h>
uint16_t
pre_allocate_sock (void *zmq_socket_, const char *address_, const char *port_)
void setUp ()
{
struct addrinfo *addr, hint;
hint.ai_flags = 0;
hint.ai_family = AF_INET;
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
hint.ai_addrlen = 0;
hint.ai_canonname = NULL;
hint.ai_addr = NULL;
hint.ai_next = NULL;
int rc = getaddrinfo (address_, port_, &hint, &addr);
assert (rc == 0);
int s_pre = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
assert (s_pre != -1);
int flag = 1;
rc = setsockopt (s_pre, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
assert (rc == 0);
rc = bind (s_pre, addr->ai_addr, addr->ai_addrlen);
assert (rc == 0);
rc = listen (s_pre, SOMAXCONN);
assert (rc == 0);
rc = zmq_setsockopt (zmq_socket_, ZMQ_USE_FD, &s_pre, sizeof (s_pre));
assert (rc == 0);
struct sockaddr_in sin;
socklen_t len = sizeof (sin);
rc = getsockname (s_pre, (struct sockaddr *) &sin, &len);
assert (rc != -1);
freeaddrinfo (addr);
return ntohs (sin.sin_port);
setup_test_context ();
}
void test_req_rep ()
void tearDown ()
{
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
teardown_test_context ();
}
uint16_t port = pre_allocate_sock (sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
#if !defined(ZMQ_HAVE_WINDOWS)
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
int setup_socket_and_set_fd (void *zmq_socket_,
int af_,
int protocol_,
const sockaddr *addr_,
size_t addr_len_)
{
const int s_pre =
TEST_ASSERT_SUCCESS_ERRNO (socket (af_, SOCK_STREAM, protocol_));
bounce (sb, sc);
if (af_ == AF_INET) {
int flag = 1;
TEST_ASSERT_SUCCESS_ERRNO (
setsockopt (s_pre, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)));
}
rc = zmq_close (sc);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (bind (s_pre, addr_, addr_len_));
TEST_ASSERT_SUCCESS_ERRNO (listen (s_pre, SOMAXCONN));
rc = zmq_close (sb);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (zmq_socket_, ZMQ_USE_FD, &s_pre, sizeof (s_pre)));
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return s_pre;
}
void test_pair ()
typedef void (*pre_allocate_sock_fun_t) (void *, char *);
void setup_socket_pair (pre_allocate_sock_fun_t pre_allocate_sock_fun_,
int bind_socket_type_,
int connect_socket_type_,
void **out_sb_,
void **out_sc_)
{
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
*out_sb_ = test_context_socket (bind_socket_type_);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
char my_endpoint[MAX_SOCKET_STRING];
pre_allocate_sock_fun_ (*out_sb_, my_endpoint);
uint16_t port = pre_allocate_sock (sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (*out_sb_, my_endpoint));
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
*out_sc_ = test_context_socket (connect_socket_type_);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*out_sc_, my_endpoint));
}
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
void test_socket_pair (pre_allocate_sock_fun_t pre_allocate_sock_fun_,
int bind_socket_type_,
int connect_socket_type_)
{
void *sb, *sc;
setup_socket_pair (pre_allocate_sock_fun_, bind_socket_type_,
connect_socket_type_, &sb, &sc);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
rc = zmq_close (sb);
assert (rc == 0);
void test_req_rep (pre_allocate_sock_fun_t pre_allocate_sock_fun_)
{
test_socket_pair (pre_allocate_sock_fun_, ZMQ_REP, ZMQ_REQ);
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
void test_pair (pre_allocate_sock_fun_t pre_allocate_sock_fun_)
{
test_socket_pair (pre_allocate_sock_fun_, ZMQ_PAIR, ZMQ_PAIR);
}
void test_client_server ()
void test_client_server (pre_allocate_sock_fun_t pre_allocate_sock_fun_)
{
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_SERVER);
assert (sb);
uint16_t port = pre_allocate_sock (sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_CLIENT);
assert (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
void *sb, *sc;
setup_socket_pair (pre_allocate_sock_fun_, ZMQ_SERVER, ZMQ_CLIENT, &sb,
&sc);
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
char *data = (char *) zmq_msg_data (&msg);
data[0] = 1;
rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE);
assert (rc == -1);
int rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE);
// TODO which error code is expected?
TEST_ASSERT_EQUAL_INT (-1, rc);
rc = zmq_msg_send (&msg, sc, 0);
assert (rc == 1);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_init (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
TEST_ASSERT_EQUAL_INT (1, rc);
uint32_t routing_id = zmq_msg_routing_id (&msg);
assert (routing_id != 0);
TEST_ASSERT_NOT_EQUAL (0, routing_id);
rc = zmq_msg_close (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
data = (char *) zmq_msg_data (&msg);
data[0] = 2;
rc = zmq_msg_set_routing_id (&msg, routing_id);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_routing_id (&msg, routing_id));
rc = zmq_msg_send (&msg, sb, ZMQ_SNDMORE);
assert (rc == -1);
// TODO which error code is expected?
TEST_ASSERT_EQUAL_INT (-1, rc);
rc = zmq_msg_send (&msg, sb, 0);
assert (rc == 1);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_recv (&msg, sc, 0);
assert (rc == 1);
TEST_ASSERT_EQUAL_INT (1, rc);
routing_id = zmq_msg_routing_id (&msg);
assert (routing_id == 0);
TEST_ASSERT_EQUAL_INT (0, routing_id);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
test_context_socket_close (sc);
test_context_socket_close (sb);
#endif
}
uint16_t pre_allocate_sock_tcp_int (void *zmq_socket_,
const char *address_,
const char *port_)
{
struct addrinfo *addr, hint;
hint.ai_flags = 0;
hint.ai_family = AF_INET;
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
hint.ai_addrlen = 0;
hint.ai_canonname = NULL;
hint.ai_addr = NULL;
hint.ai_next = NULL;
TEST_ASSERT_SUCCESS_ERRNO (getaddrinfo (address_, port_, &hint, &addr));
const int s_pre = setup_socket_and_set_fd (
zmq_socket_, AF_INET, IPPROTO_TCP, addr->ai_addr, addr->ai_addrlen);
struct sockaddr_in sin;
socklen_t len = sizeof (sin);
TEST_ASSERT_SUCCESS_ERRNO (
getsockname (s_pre, (struct sockaddr *) &sin, &len));
freeaddrinfo (addr);
return ntohs (sin.sin_port);
}
void pre_allocate_sock_tcp (void *socket_, char *my_endpoint_)
{
const uint16_t port = pre_allocate_sock_tcp_int (socket_, "127.0.0.1", "0");
sprintf (my_endpoint_, "tcp://127.0.0.1:%u", port);
}
void test_req_rep_tcp ()
{
test_req_rep (pre_allocate_sock_tcp);
}
rc = zmq_msg_close (&msg);
assert (rc == 0);
void test_pair_tcp ()
{
test_pair (pre_allocate_sock_tcp);
}
void test_client_server_tcp ()
{
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
test_client_server (pre_allocate_sock_tcp);
#endif
}
void pre_allocate_sock_ipc_int (void *zmq_socket_, const char *path_)
{
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy (addr.sun_path, path_);
// TODO check return value of unlink
unlink (path_);
setup_socket_and_set_fd (zmq_socket_, AF_UNIX, 0, (struct sockaddr *) &addr,
sizeof (struct sockaddr_un));
}
void pre_allocate_sock_ipc (void *sb_, char *my_endpoint_)
{
pre_allocate_sock_ipc_int (sb_, "/tmp/test_use_fd_ipc");
strcpy (my_endpoint_, "ipc:///tmp/test_use_fd_ipc");
}
rc = zmq_close (sc);
assert (rc == 0);
void test_req_rep_ipc ()
{
test_req_rep (pre_allocate_sock_ipc);
rc = zmq_close (sb);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (unlink ("/tmp/test_use_fd_ipc"));
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
void test_pair_ipc ()
{
test_pair (pre_allocate_sock_ipc);
TEST_ASSERT_SUCCESS_ERRNO (unlink ("/tmp/test_use_fd_ipc"));
}
void test_client_server_ipc ()
{
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
test_client_server (pre_allocate_sock_ipc);
TEST_ASSERT_SUCCESS_ERRNO (unlink ("/tmp/test_use_fd_ipc"));
#endif
}
int main (void)
int main ()
{
setup_test_environment ();
test_req_rep ();
test_pair ();
test_client_server ();
UNITY_BEGIN ();
RUN_TEST (test_req_rep_tcp);
RUN_TEST (test_pair_tcp);
RUN_TEST (test_client_server_tcp);
return 0;
RUN_TEST (test_req_rep_ipc);
RUN_TEST (test_pair_ipc);
RUN_TEST (test_client_server_ipc);
return UNITY_END ();
}
#else
int main (void)
int main ()
{
return 0;
}
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#if !defined(ZMQ_HAVE_WINDOWS)
#include <sys/socket.h>
#include <sys/un.h>
void pre_allocate_sock (void *zmq_socket_, const char *path_)
{
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy (addr.sun_path, path_);
unlink (path_);
int s_pre = socket (AF_UNIX, SOCK_STREAM, 0);
assert (s_pre != -1);
int rc =
bind (s_pre, (struct sockaddr *) &addr, sizeof (struct sockaddr_un));
assert (rc == 0);
rc = listen (s_pre, SOMAXCONN);
assert (rc == 0);
rc = zmq_setsockopt (zmq_socket_, ZMQ_USE_FD, &s_pre, sizeof (s_pre));
assert (rc == 0);
}
void test_req_rep ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
pre_allocate_sock (sb, "/tmp/test_use_fd_ipc");
int rc = zmq_bind (sb, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
rc = unlink ("/tmp/test_use_fd_ipc");
assert (rc == 0);
}
void test_pair ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
pre_allocate_sock (sb, "/tmp/test_use_fd_ipc");
int rc = zmq_bind (sb, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
rc = unlink ("/tmp/test_use_fd_ipc");
assert (rc == 0);
}
void test_client_server ()
{
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_SERVER);
assert (sb);
pre_allocate_sock (sb, "/tmp/test_use_fd_ipc");
int rc = zmq_bind (sb, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_CLIENT);
assert (sc);
rc = zmq_connect (sc, "ipc:///tmp/test_use_fd_ipc");
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0);
char *data = (char *) zmq_msg_data (&msg);
data[0] = 1;
rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE);
assert (rc == -1);
rc = zmq_msg_send (&msg, sc, 0);
assert (rc == 1);
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
uint32_t routing_id = zmq_msg_routing_id (&msg);
assert (routing_id != 0);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0);
data = (char *) zmq_msg_data (&msg);
data[0] = 2;
rc = zmq_msg_set_routing_id (&msg, routing_id);
assert (rc == 0);
rc = zmq_msg_send (&msg, sb, ZMQ_SNDMORE);
assert (rc == -1);
rc = zmq_msg_send (&msg, sb, 0);
assert (rc == 1);
rc = zmq_msg_recv (&msg, sc, 0);
assert (rc == 1);
routing_id = zmq_msg_routing_id (&msg);
assert (routing_id == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
rc = unlink ("/tmp/test_use_fd_ipc");
assert (rc == 0);
#endif
}
int main (void)
{
setup_test_environment ();
test_req_rep ();
test_pair ();
test_client_server ();
return 0;
}
#else
int main (void)
{
return 0;
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment