Commit 93f99d09 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1804 from bluca/test_fixes

Various test fixes
parents 18dcc32a facb5121
...@@ -28,7 +28,7 @@ autom4te.cache ...@@ -28,7 +28,7 @@ autom4te.cache
curve_keygen curve_keygen
test_heartbeats test_heartbeats
test_msg_ffn test_msg_ffn
test_socketopt_hwm test_sockopt_hwm
test_resource test_resource
test_ipc_wildcard test_ipc_wildcard
test_stream_empty test_stream_empty
......
...@@ -11,3 +11,8 @@ Note that testutil.hpp includes platform.h. Do not include it yourself as it cha ...@@ -11,3 +11,8 @@ Note that testutil.hpp includes platform.h. Do not include it yourself as it cha
All sources must contain the correct header. Please copy from test_system.cpp if you're not certain. All sources must contain the correct header. Please copy from test_system.cpp if you're not certain.
Please use only ANSI C99 in test cases, no C++. This is to make the code more reusable. Please use only ANSI C99 in test cases, no C++. This is to make the code more reusable.
On many slower environments, like embedded systems, VMs or CI systems, test might
fail because it takes time for sockets to settle after a connect. If you need
to add a sleep, please be consistent with all the other tests and use:
msleep (SETTLE_TIME);
\ No newline at end of file
...@@ -86,6 +86,8 @@ int main (void) ...@@ -86,6 +86,8 @@ int main (void)
assert (WEXITSTATUS (child_status) == 0); assert (WEXITSTATUS (child_status) == 0);
break; break;
} }
zmq_close (pull);
zmq_ctx_term (ctx);
exit (0); exit (0);
} }
#endif #endif
......
...@@ -39,7 +39,7 @@ get_monitor_event (void *monitor) ...@@ -39,7 +39,7 @@ get_monitor_event (void *monitor)
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) { if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
msleep(150); msleep (SETTLE_TIME);
continue; // Interruped, presumably continue; // Interruped, presumably
} }
assert (zmq_msg_more (&msg)); assert (zmq_msg_more (&msg));
...@@ -251,7 +251,7 @@ test_heartbeat_ttl (void) ...@@ -251,7 +251,7 @@ test_heartbeat_ttl (void)
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED); assert(rc == ZMQ_EVENT_ACCEPTED);
msleep(100); msleep (SETTLE_TIME);
// We should have been disconnected // We should have been disconnected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
...@@ -291,7 +291,7 @@ test_heartbeat_notimeout (int is_curve) ...@@ -291,7 +291,7 @@ test_heartbeat_notimeout (int is_curve)
rc = zmq_connect(client, "tcp://127.0.0.1:5556"); rc = zmq_connect(client, "tcp://127.0.0.1:5556");
// Give it a sec to connect and handshake // Give it a sec to connect and handshake
msleep(100); msleep (SETTLE_TIME);
// By now everything should report as connected // By now everything should report as connected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
......
...@@ -180,7 +180,7 @@ void test_reset_hwm () ...@@ -180,7 +180,7 @@ void test_reset_hwm ()
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0); rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
assert (rc == 0); assert (rc == 0);
msleep (100); msleep (SETTLE_TIME);
// Send messages // Send messages
int send_count = 0; int send_count = 0;
...@@ -188,7 +188,7 @@ void test_reset_hwm () ...@@ -188,7 +188,7 @@ void test_reset_hwm ()
++send_count; ++send_count;
assert (first_count == send_count); assert (first_count == send_count);
msleep (100); msleep (SETTLE_TIME);
// Now receive all sent messages // Now receive all sent messages
int recv_count = 0; int recv_count = 0;
...@@ -198,7 +198,7 @@ void test_reset_hwm () ...@@ -198,7 +198,7 @@ void test_reset_hwm ()
} }
assert (first_count == recv_count); assert (first_count == recv_count);
msleep (100); msleep (SETTLE_TIME);
// Send messages // Send messages
send_count = 0; send_count = 0;
...@@ -206,7 +206,7 @@ void test_reset_hwm () ...@@ -206,7 +206,7 @@ void test_reset_hwm ()
++send_count; ++send_count;
assert (second_count == send_count); assert (second_count == send_count);
msleep (100); msleep (SETTLE_TIME);
// Now receive all sent messages // Now receive all sent messages
recv_count = 0; recv_count = 0;
......
...@@ -67,6 +67,8 @@ int main (void) ...@@ -67,6 +67,8 @@ int main (void)
rc = zmq_connect (from, "tcp://localhost:6555"); rc = zmq_connect (from, "tcp://localhost:6555");
assert (rc == 0); assert (rc == 0);
msleep (SETTLE_TIME);
// We send 10 messages, 5 should just get stuck in the queue // We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
......
...@@ -57,6 +57,7 @@ int main (void) { ...@@ -57,6 +57,7 @@ int main (void) {
zmq_msg_t msg; zmq_msg_t msg;
char hint[5]; char hint[5];
char data[255]; char data[255];
memset(data, 0, 255);
memcpy(data, (void *) "data", 4); memcpy(data, (void *) "data", 4);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
rc = zmq_msg_init_data(&msg, (void *)data, 255, ffn, (void*)hint); rc = zmq_msg_init_data(&msg, (void *)data, 255, ffn, (void*)hint);
...@@ -64,7 +65,7 @@ int main (void) { ...@@ -64,7 +65,7 @@ int main (void) {
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
...@@ -80,7 +81,7 @@ int main (void) { ...@@ -80,7 +81,7 @@ int main (void) {
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
...@@ -94,9 +95,9 @@ int main (void) { ...@@ -94,9 +95,9 @@ int main (void) {
assert (rc > -1); assert (rc > -1);
rc = zmq_recv(router, buf, 255, 0); rc = zmq_recv(router, buf, 255, 0);
assert (rc == 255); assert (rc == 255);
assert (memcmp(data, buf, 5) == 0); assert (memcmp(data, buf, 4) == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
...@@ -115,13 +116,13 @@ int main (void) { ...@@ -115,13 +116,13 @@ int main (void) {
assert (rc > -1); assert (rc > -1);
rc = zmq_recv(router, buf, 255, 0); rc = zmq_recv(router, buf, 255, 0);
assert (rc == 255); assert (rc == 255);
assert (memcmp(data, buf, 5) == 0); assert (memcmp(data, buf, 4) == 0);
rc = zmq_msg_close(&msg2); rc = zmq_msg_close(&msg2);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
......
...@@ -96,7 +96,7 @@ int main (void) ...@@ -96,7 +96,7 @@ int main (void)
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564"); rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
assert (rc == 0); assert (rc == 0);
msleep (50); msleep (SETTLE_TIME);
rc = zmq_send (publisher, "This is a test", 14, 0); rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14); assert (rc == 14);
......
...@@ -117,7 +117,7 @@ int main (void) ...@@ -117,7 +117,7 @@ int main (void)
rc = zmq_connect (dish, "tcp://127.0.0.1:5556"); rc = zmq_connect (dish, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); msleep (SETTLE_TIME);
zmq_msg_t msg; zmq_msg_t msg;
......
...@@ -103,8 +103,7 @@ int main (void) ...@@ -103,8 +103,7 @@ int main (void)
s_send_seq (rep [3], "BAD", SEQ_END); s_send_seq (rep [3], "BAD", SEQ_END);
// Wait for message to be there. // Wait for message to be there.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Without receiving that reply, send another request on the REQ socket // Without receiving that reply, send another request on the REQ socket
s_send_seq (req, "I", SEQ_END); s_send_seq (req, "I", SEQ_END);
...@@ -127,8 +126,7 @@ int main (void) ...@@ -127,8 +126,7 @@ int main (void)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -83,6 +83,7 @@ int main (void) ...@@ -83,6 +83,7 @@ int main (void)
int i; int i;
const int BUF_SIZE = 65536; const int BUF_SIZE = 65536;
char buf[BUF_SIZE]; char buf[BUF_SIZE];
memset(buf, 0, BUF_SIZE);
// Send first batch of messages // Send first batch of messages
for(i = 0; i < 100000; ++i) { for(i = 0; i < 100000; ++i) {
if (TRACE_ENABLED) fprintf(stderr, "Sending message %d ...\n", i); if (TRACE_ENABLED) fprintf(stderr, "Sending message %d ...\n", i);
......
...@@ -150,7 +150,7 @@ void test_decrease_when_full() ...@@ -150,7 +150,7 @@ void test_decrease_when_full()
assert(read_count == 101); assert(read_count == 101);
// Give io thread some time to catch up // Give io thread some time to catch up
msleep(10); msleep (SETTLE_TIME);
// Fill up to new hwm // Fill up to new hwm
send_count = 0; send_count = 0;
......
...@@ -55,8 +55,7 @@ void test_round_robin_out (void *ctx) ...@@ -55,8 +55,7 @@ void test_round_robin_out (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send all requests // Send all requests
for (size_t i = 0; i < services; ++i) for (size_t i = 0; i < services; ++i)
...@@ -78,8 +77,7 @@ void test_round_robin_out (void *ctx) ...@@ -78,8 +77,7 @@ void test_round_robin_out (void *ctx)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_fair_queue_in (void *ctx) void test_fair_queue_in (void *ctx)
...@@ -122,8 +120,7 @@ void test_fair_queue_in (void *ctx) ...@@ -122,8 +120,7 @@ void test_fair_queue_in (void *ctx)
s_send_seq (senders [peer], "B", SEQ_END); s_send_seq (senders [peer], "B", SEQ_END);
// Wait for data. // Wait for data.
rc = zmq_poll (0, 0, 50); msleep (SETTLE_TIME);
assert (rc == 0);
// handle the requests // handle the requests
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
...@@ -138,8 +135,7 @@ void test_fair_queue_in (void *ctx) ...@@ -138,8 +135,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (senders [peer]); close_zero_linger (senders [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_destroy_queue_on_disconnect (void *ctx) void test_destroy_queue_on_disconnect (void *ctx)
...@@ -201,8 +197,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -201,8 +197,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_block_on_send_no_peers (void *ctx) void test_block_on_send_no_peers (void *ctx)
......
...@@ -55,8 +55,7 @@ void test_push_round_robin_out (void *ctx) ...@@ -55,8 +55,7 @@ void test_push_round_robin_out (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send 2N messages // Send 2N messages
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
...@@ -76,8 +75,7 @@ void test_push_round_robin_out (void *ctx) ...@@ -76,8 +75,7 @@ void test_push_round_robin_out (void *ctx)
close_zero_linger (pulls [peer]); close_zero_linger (pulls [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_pull_fair_queue_in (void *ctx) void test_pull_fair_queue_in (void *ctx)
...@@ -100,8 +98,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -100,8 +98,7 @@ void test_pull_fair_queue_in (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
int first_half = 0; int first_half = 0;
int second_half = 0; int second_half = 0;
...@@ -122,8 +119,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -122,8 +119,7 @@ void test_pull_fair_queue_in (void *ctx)
} }
// Wait for data. // Wait for data.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
...@@ -156,8 +152,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -156,8 +152,7 @@ void test_pull_fair_queue_in (void *ctx)
close_zero_linger (pushs [peer]); close_zero_linger (pushs [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_push_block_on_send_no_peers (void *ctx) void test_push_block_on_send_no_peers (void *ctx)
...@@ -260,8 +255,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -260,8 +255,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
int main (void) int main (void)
......
...@@ -57,6 +57,8 @@ void test_fair_queue_in (void *ctx) ...@@ -57,6 +57,8 @@ void test_fair_queue_in (void *ctx)
assert (rc == 0); assert (rc == 0);
} }
msleep (SETTLE_TIME);
s_send_seq (reqs [0], "A", SEQ_END); s_send_seq (reqs [0], "A", SEQ_END);
s_recv_seq (rep, "A", SEQ_END); s_recv_seq (rep, "A", SEQ_END);
s_send_seq (rep, "A", SEQ_END); s_send_seq (rep, "A", SEQ_END);
...@@ -94,8 +96,7 @@ void test_fair_queue_in (void *ctx) ...@@ -94,8 +96,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (reqs [peer]); close_zero_linger (reqs [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_envelope (void *ctx) void test_envelope (void *ctx)
...@@ -128,8 +129,7 @@ void test_envelope (void *ctx) ...@@ -128,8 +129,7 @@ void test_envelope (void *ctx)
close_zero_linger (dealer); close_zero_linger (dealer);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
int main (void) int main (void)
......
...@@ -71,8 +71,7 @@ void test_round_robin_out (void *ctx) ...@@ -71,8 +71,7 @@ void test_round_robin_out (void *ctx)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_req_only_listens_to_current_peer (void *ctx) void test_req_only_listens_to_current_peer (void *ctx)
...@@ -106,8 +105,7 @@ void test_req_only_listens_to_current_peer (void *ctx) ...@@ -106,8 +105,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
} }
// Wait for connects to finish. // Wait for connects to finish.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
for (size_t i = 0; i < services; ++i) { for (size_t i = 0; i < services; ++i) {
// There still is a race condition when a stale peer's message // There still is a race condition when a stale peer's message
...@@ -137,8 +135,7 @@ void test_req_only_listens_to_current_peer (void *ctx) ...@@ -137,8 +135,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
close_zero_linger (router [i]); close_zero_linger (router [i]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_req_message_format (void *ctx) void test_req_message_format (void *ctx)
...@@ -196,8 +193,7 @@ void test_req_message_format (void *ctx) ...@@ -196,8 +193,7 @@ void test_req_message_format (void *ctx)
close_zero_linger (router); close_zero_linger (router);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_block_on_send_no_peers (void *ctx) void test_block_on_send_no_peers (void *ctx)
......
...@@ -63,6 +63,8 @@ void test_fair_queue_in (void *ctx) ...@@ -63,6 +63,8 @@ void test_fair_queue_in (void *ctx)
assert (rc == 0); assert (rc == 0);
} }
msleep (SETTLE_TIME);
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
...@@ -104,8 +106,7 @@ void test_fair_queue_in (void *ctx) ...@@ -104,8 +106,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (senders [peer]); close_zero_linger (senders [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_destroy_queue_on_disconnect (void *ctx) void test_destroy_queue_on_disconnect (void *ctx)
...@@ -130,8 +131,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -130,8 +131,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (rc == 0); assert (rc == 0);
// Wait for connection. // Wait for connection.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send a message in both directions // Send a message in both directions
s_send_seq (A, "B", "ABC", SEQ_END); s_send_seq (A, "B", "ABC", SEQ_END);
...@@ -178,8 +178,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -178,8 +178,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#ifdef _WIN32 #ifdef _WIN32
#include <Winsock2.h> #include <Winsock2.h>
#include <Ws2tcpip.h> #include <Ws2tcpip.h>
#define usleep(a) Sleep((a) / 1000)
#else #else
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -62,6 +61,7 @@ int main (void) ...@@ -62,6 +61,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
char tmp[MSG_SIZE]; char tmp[MSG_SIZE];
memset (tmp, 0, MSG_SIZE);
zmq_send(req, tmp, MSG_SIZE, 0); zmq_send(req, tmp, MSG_SIZE, 0);
zmq_msg_t msg; zmq_msg_t msg;
...@@ -101,7 +101,7 @@ int main (void) ...@@ -101,7 +101,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// sleep a bit for the socket to be freed // sleep a bit for the socket to be freed
usleep(30000); msleep (SETTLE_TIME);
// getting name from closed socket will fail // getting name from closed socket will fail
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen); rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
......
...@@ -97,4 +97,7 @@ int main (void) ...@@ -97,4 +97,7 @@ int main (void)
for (count = 0; count < 1000; count++) { for (count = 0; count < 1000; count++) {
close(handle[count]); close(handle[count]);
} }
zmq_close(dealer);
zmq_ctx_term(ctx);
} }
...@@ -156,6 +156,14 @@ int main (void) ...@@ -156,6 +156,14 @@ int main (void)
assert (rc == 0); assert (rc == 0);
#endif #endif
// Clean up.
rc = zmq_close (pull);
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
// Create infrastructure (wild-card binding) // Create infrastructure (wild-card binding)
ctx = zmq_ctx_new (); ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -188,5 +196,13 @@ int main (void) ...@@ -188,5 +196,13 @@ int main (void)
assert (rc == -1 && zmq_errno () == ENOENT); assert (rc == -1 && zmq_errno () == ENOENT);
#endif #endif
// Clean up.
rc = zmq_close (pull);
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0; return 0;
} }
...@@ -98,7 +98,7 @@ int main (void) ...@@ -98,7 +98,7 @@ int main (void)
rc = zmq_bind (dish, "udp://127.0.0.1:5556"); rc = zmq_bind (dish, "udp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); msleep (SETTLE_TIME);
rc = zmq_join (dish, "TV"); rc = zmq_join (dish, "TV");
assert (rc == 0); assert (rc == 0);
......
...@@ -55,6 +55,8 @@ void pre_allocate_sock (void *zmq_socket, const char *address, ...@@ -55,6 +55,8 @@ void pre_allocate_sock (void *zmq_socket, const char *address,
rc = zmq_setsockopt (zmq_socket, ZMQ_USE_FD, &s_pre, rc = zmq_setsockopt (zmq_socket, ZMQ_USE_FD, &s_pre,
sizeof (s_pre)); sizeof (s_pre));
assert(rc == 0); assert(rc == 0);
freeaddrinfo(addr);
} }
void test_req_rep () void test_req_rep ()
......
...@@ -124,7 +124,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -124,7 +124,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0); assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes and confirms subscriptions // proxy reroutes and confirms subscriptions
char sub_buff[2]; char sub_buff[2];
...@@ -141,7 +141,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -141,7 +141,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0); assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes // proxy reroutes
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2); assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
...@@ -151,14 +151,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -151,14 +151,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send a msg // let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers // proxy reroutes data messages to subscribers
char topic_buff[1]; char topic_buff[1];
...@@ -171,7 +171,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -171,7 +171,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// each subscriber should now get a message // each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
...@@ -189,7 +189,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -189,7 +189,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_close (sub2) == 0); assert (zmq_close (sub2) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// unsubscribe messages are passed from proxy to publisher // unsubscribe messages are passed from proxy to publisher
assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2); assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2);
...@@ -207,14 +207,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -207,14 +207,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send a msg // let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// nothing should come to the proxy // nothing should come to the proxy
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1); assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1);
...@@ -273,7 +273,7 @@ int test_missing_subscriptions() ...@@ -273,7 +273,7 @@ int test_missing_subscriptions()
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0); assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy now reroutes and confirms subscriptions // proxy now reroutes and confirms subscriptions
char buffer[2]; char buffer[2];
...@@ -290,7 +290,7 @@ int test_missing_subscriptions() ...@@ -290,7 +290,7 @@ int test_missing_subscriptions()
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2); assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send 2 msgs, each with its own topic // let publisher send 2 msgs, each with its own topic
assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
...@@ -299,7 +299,7 @@ int test_missing_subscriptions() ...@@ -299,7 +299,7 @@ int test_missing_subscriptions()
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers // proxy reroutes data messages to subscribers
char topic_buff [1]; char topic_buff [1];
...@@ -319,7 +319,7 @@ int test_missing_subscriptions() ...@@ -319,7 +319,7 @@ int test_missing_subscriptions()
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// each subscriber should now get a message // each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
......
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
// This defines the settle time used in tests; raise this if we // This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not // get test failures on slower systems due to binds/connects not
// settled. Tested to work reliably at 1 msec on a fast PC. // settled. Tested to work reliably at 1 msec on a fast PC.
#define SETTLE_TIME 50 // In msec #define SETTLE_TIME 300 // In msec
#undef NDEBUG #undef NDEBUG
#include <time.h> #include <time.h>
...@@ -56,6 +56,7 @@ ...@@ -56,6 +56,7 @@
# pragma warning(disable:4996) # pragma warning(disable:4996)
# endif # endif
#else #else
# include <pthread.h>
# include <unistd.h> # include <unistd.h>
# include <signal.h> # include <signal.h>
# include <stdlib.h> # include <stdlib.h>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment