Commit 4a84f8a0 authored by Luca Boccassi's avatar Luca Boccassi

Problem: tests use various sleep patterns to wait

Solution: use msleep (SETTLE_TIME) everywhere when waiting for the
connections/sockets to be settled instead of a variety of patterns
and functions to make tests more coherent.
parent 955b51df
...@@ -39,7 +39,7 @@ get_monitor_event (void *monitor) ...@@ -39,7 +39,7 @@ get_monitor_event (void *monitor)
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) { if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
msleep(150); msleep (SETTLE_TIME);
continue; // Interruped, presumably continue; // Interruped, presumably
} }
assert (zmq_msg_more (&msg)); assert (zmq_msg_more (&msg));
...@@ -251,7 +251,7 @@ test_heartbeat_ttl (void) ...@@ -251,7 +251,7 @@ test_heartbeat_ttl (void)
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED); assert(rc == ZMQ_EVENT_ACCEPTED);
msleep(100); msleep (SETTLE_TIME);
// We should have been disconnected // We should have been disconnected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
...@@ -291,7 +291,7 @@ test_heartbeat_notimeout (int is_curve) ...@@ -291,7 +291,7 @@ test_heartbeat_notimeout (int is_curve)
rc = zmq_connect(client, "tcp://127.0.0.1:5556"); rc = zmq_connect(client, "tcp://127.0.0.1:5556");
// Give it a sec to connect and handshake // Give it a sec to connect and handshake
msleep(100); msleep (SETTLE_TIME);
// By now everything should report as connected // By now everything should report as connected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
......
...@@ -180,7 +180,7 @@ void test_reset_hwm () ...@@ -180,7 +180,7 @@ void test_reset_hwm ()
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0); rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
assert (rc == 0); assert (rc == 0);
msleep (100); msleep (SETTLE_TIME);
// Send messages // Send messages
int send_count = 0; int send_count = 0;
...@@ -188,7 +188,7 @@ void test_reset_hwm () ...@@ -188,7 +188,7 @@ void test_reset_hwm ()
++send_count; ++send_count;
assert (first_count == send_count); assert (first_count == send_count);
msleep (100); msleep (SETTLE_TIME);
// Now receive all sent messages // Now receive all sent messages
int recv_count = 0; int recv_count = 0;
...@@ -198,7 +198,7 @@ void test_reset_hwm () ...@@ -198,7 +198,7 @@ void test_reset_hwm ()
} }
assert (first_count == recv_count); assert (first_count == recv_count);
msleep (100); msleep (SETTLE_TIME);
// Send messages // Send messages
send_count = 0; send_count = 0;
...@@ -206,7 +206,7 @@ void test_reset_hwm () ...@@ -206,7 +206,7 @@ void test_reset_hwm ()
++send_count; ++send_count;
assert (second_count == send_count); assert (second_count == send_count);
msleep (100); msleep (SETTLE_TIME);
// Now receive all sent messages // Now receive all sent messages
recv_count = 0; recv_count = 0;
......
...@@ -65,7 +65,7 @@ int main (void) { ...@@ -65,7 +65,7 @@ int main (void) {
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
...@@ -81,7 +81,7 @@ int main (void) { ...@@ -81,7 +81,7 @@ int main (void) {
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
...@@ -97,7 +97,7 @@ int main (void) { ...@@ -97,7 +97,7 @@ int main (void) {
assert (rc == 255); assert (rc == 255);
assert (memcmp(data, buf, 4) == 0); assert (memcmp(data, buf, 4) == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
...@@ -122,7 +122,7 @@ int main (void) { ...@@ -122,7 +122,7 @@ int main (void) {
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
msleep(50); msleep (SETTLE_TIME);
assert (memcmp(hint, "freed", 5) == 0); assert (memcmp(hint, "freed", 5) == 0);
memcpy(hint, (void *) "hint", 4); memcpy(hint, (void *) "hint", 4);
......
...@@ -96,7 +96,7 @@ int main (void) ...@@ -96,7 +96,7 @@ int main (void)
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564"); rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
assert (rc == 0); assert (rc == 0);
msleep (50); msleep (SETTLE_TIME);
rc = zmq_send (publisher, "This is a test", 14, 0); rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14); assert (rc == 14);
......
...@@ -117,7 +117,7 @@ int main (void) ...@@ -117,7 +117,7 @@ int main (void)
rc = zmq_connect (dish, "tcp://127.0.0.1:5556"); rc = zmq_connect (dish, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); msleep (SETTLE_TIME);
zmq_msg_t msg; zmq_msg_t msg;
......
...@@ -103,8 +103,7 @@ int main (void) ...@@ -103,8 +103,7 @@ int main (void)
s_send_seq (rep [3], "BAD", SEQ_END); s_send_seq (rep [3], "BAD", SEQ_END);
// Wait for message to be there. // Wait for message to be there.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Without receiving that reply, send another request on the REQ socket // Without receiving that reply, send another request on the REQ socket
s_send_seq (req, "I", SEQ_END); s_send_seq (req, "I", SEQ_END);
...@@ -127,8 +126,7 @@ int main (void) ...@@ -127,8 +126,7 @@ int main (void)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -150,7 +150,7 @@ void test_decrease_when_full() ...@@ -150,7 +150,7 @@ void test_decrease_when_full()
assert(read_count == 101); assert(read_count == 101);
// Give io thread some time to catch up // Give io thread some time to catch up
msleep(10); msleep (SETTLE_TIME);
// Fill up to new hwm // Fill up to new hwm
send_count = 0; send_count = 0;
......
...@@ -55,8 +55,7 @@ void test_round_robin_out (void *ctx) ...@@ -55,8 +55,7 @@ void test_round_robin_out (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send all requests // Send all requests
for (size_t i = 0; i < services; ++i) for (size_t i = 0; i < services; ++i)
...@@ -78,8 +77,7 @@ void test_round_robin_out (void *ctx) ...@@ -78,8 +77,7 @@ void test_round_robin_out (void *ctx)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_fair_queue_in (void *ctx) void test_fair_queue_in (void *ctx)
...@@ -122,8 +120,7 @@ void test_fair_queue_in (void *ctx) ...@@ -122,8 +120,7 @@ void test_fair_queue_in (void *ctx)
s_send_seq (senders [peer], "B", SEQ_END); s_send_seq (senders [peer], "B", SEQ_END);
// Wait for data. // Wait for data.
rc = zmq_poll (0, 0, 50); msleep (SETTLE_TIME);
assert (rc == 0);
// handle the requests // handle the requests
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
...@@ -138,8 +135,7 @@ void test_fair_queue_in (void *ctx) ...@@ -138,8 +135,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (senders [peer]); close_zero_linger (senders [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_destroy_queue_on_disconnect (void *ctx) void test_destroy_queue_on_disconnect (void *ctx)
...@@ -201,8 +197,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -201,8 +197,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_block_on_send_no_peers (void *ctx) void test_block_on_send_no_peers (void *ctx)
......
...@@ -55,8 +55,7 @@ void test_push_round_robin_out (void *ctx) ...@@ -55,8 +55,7 @@ void test_push_round_robin_out (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send 2N messages // Send 2N messages
for (size_t peer = 0; peer < services; ++peer) for (size_t peer = 0; peer < services; ++peer)
...@@ -76,8 +75,7 @@ void test_push_round_robin_out (void *ctx) ...@@ -76,8 +75,7 @@ void test_push_round_robin_out (void *ctx)
close_zero_linger (pulls [peer]); close_zero_linger (pulls [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_pull_fair_queue_in (void *ctx) void test_pull_fair_queue_in (void *ctx)
...@@ -100,8 +98,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -100,8 +98,7 @@ void test_pull_fair_queue_in (void *ctx)
} }
// Wait for connections. // Wait for connections.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
int first_half = 0; int first_half = 0;
int second_half = 0; int second_half = 0;
...@@ -122,8 +119,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -122,8 +119,7 @@ void test_pull_fair_queue_in (void *ctx)
} }
// Wait for data. // Wait for data.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
...@@ -156,8 +152,7 @@ void test_pull_fair_queue_in (void *ctx) ...@@ -156,8 +152,7 @@ void test_pull_fair_queue_in (void *ctx)
close_zero_linger (pushs [peer]); close_zero_linger (pushs [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_push_block_on_send_no_peers (void *ctx) void test_push_block_on_send_no_peers (void *ctx)
...@@ -260,8 +255,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -260,8 +255,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
int main (void) int main (void)
......
...@@ -94,8 +94,7 @@ void test_fair_queue_in (void *ctx) ...@@ -94,8 +94,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (reqs [peer]); close_zero_linger (reqs [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_envelope (void *ctx) void test_envelope (void *ctx)
...@@ -128,8 +127,7 @@ void test_envelope (void *ctx) ...@@ -128,8 +127,7 @@ void test_envelope (void *ctx)
close_zero_linger (dealer); close_zero_linger (dealer);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
int main (void) int main (void)
......
...@@ -71,8 +71,7 @@ void test_round_robin_out (void *ctx) ...@@ -71,8 +71,7 @@ void test_round_robin_out (void *ctx)
close_zero_linger (rep [peer]); close_zero_linger (rep [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_req_only_listens_to_current_peer (void *ctx) void test_req_only_listens_to_current_peer (void *ctx)
...@@ -106,8 +105,7 @@ void test_req_only_listens_to_current_peer (void *ctx) ...@@ -106,8 +105,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
} }
// Wait for connects to finish. // Wait for connects to finish.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
for (size_t i = 0; i < services; ++i) { for (size_t i = 0; i < services; ++i) {
// There still is a race condition when a stale peer's message // There still is a race condition when a stale peer's message
...@@ -137,8 +135,7 @@ void test_req_only_listens_to_current_peer (void *ctx) ...@@ -137,8 +135,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
close_zero_linger (router [i]); close_zero_linger (router [i]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_req_message_format (void *ctx) void test_req_message_format (void *ctx)
...@@ -196,8 +193,7 @@ void test_req_message_format (void *ctx) ...@@ -196,8 +193,7 @@ void test_req_message_format (void *ctx)
close_zero_linger (router); close_zero_linger (router);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_block_on_send_no_peers (void *ctx) void test_block_on_send_no_peers (void *ctx)
......
...@@ -104,8 +104,7 @@ void test_fair_queue_in (void *ctx) ...@@ -104,8 +104,7 @@ void test_fair_queue_in (void *ctx)
close_zero_linger (senders [peer]); close_zero_linger (senders [peer]);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
void test_destroy_queue_on_disconnect (void *ctx) void test_destroy_queue_on_disconnect (void *ctx)
...@@ -130,8 +129,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -130,8 +129,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (rc == 0); assert (rc == 0);
// Wait for connection. // Wait for connection.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
// Send a message in both directions // Send a message in both directions
s_send_seq (A, "B", "ABC", SEQ_END); s_send_seq (A, "B", "ABC", SEQ_END);
...@@ -178,8 +176,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -178,8 +176,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
close_zero_linger (B); close_zero_linger (B);
// Wait for disconnects. // Wait for disconnects.
rc = zmq_poll (0, 0, 100); msleep (SETTLE_TIME);
assert (rc == 0);
} }
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#ifdef _WIN32 #ifdef _WIN32
#include <Winsock2.h> #include <Winsock2.h>
#include <Ws2tcpip.h> #include <Ws2tcpip.h>
#define usleep(a) Sleep((a) / 1000)
#else #else
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -102,7 +101,7 @@ int main (void) ...@@ -102,7 +101,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// sleep a bit for the socket to be freed // sleep a bit for the socket to be freed
usleep(30000); msleep (SETTLE_TIME);
// getting name from closed socket will fail // getting name from closed socket will fail
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen); rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
......
...@@ -98,7 +98,7 @@ int main (void) ...@@ -98,7 +98,7 @@ int main (void)
rc = zmq_bind (dish, "udp://127.0.0.1:5556"); rc = zmq_bind (dish, "udp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); msleep (SETTLE_TIME);
rc = zmq_join (dish, "TV"); rc = zmq_join (dish, "TV");
assert (rc == 0); assert (rc == 0);
......
...@@ -124,7 +124,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -124,7 +124,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0); assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes and confirms subscriptions // proxy reroutes and confirms subscriptions
char sub_buff[2]; char sub_buff[2];
...@@ -141,7 +141,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -141,7 +141,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0); assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes // proxy reroutes
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2); assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
...@@ -151,14 +151,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -151,14 +151,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send a msg // let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers // proxy reroutes data messages to subscribers
char topic_buff[1]; char topic_buff[1];
...@@ -171,7 +171,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -171,7 +171,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// each subscriber should now get a message // each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
...@@ -189,7 +189,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -189,7 +189,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_close (sub2) == 0); assert (zmq_close (sub2) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// unsubscribe messages are passed from proxy to publisher // unsubscribe messages are passed from proxy to publisher
assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2); assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2);
...@@ -207,14 +207,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -207,14 +207,14 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send a msg // let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// nothing should come to the proxy // nothing should come to the proxy
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1); assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1);
...@@ -273,7 +273,7 @@ int test_missing_subscriptions() ...@@ -273,7 +273,7 @@ int test_missing_subscriptions()
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0); assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy now reroutes and confirms subscriptions // proxy now reroutes and confirms subscriptions
char buffer[2]; char buffer[2];
...@@ -290,7 +290,7 @@ int test_missing_subscriptions() ...@@ -290,7 +290,7 @@ int test_missing_subscriptions()
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2); assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// let publisher send 2 msgs, each with its own topic // let publisher send 2 msgs, each with its own topic
assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1); assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
...@@ -299,7 +299,7 @@ int test_missing_subscriptions() ...@@ -299,7 +299,7 @@ int test_missing_subscriptions()
assert (zmq_send (pub, payload, 1, 0) == 1); assert (zmq_send (pub, payload, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers // proxy reroutes data messages to subscribers
char topic_buff [1]; char topic_buff [1];
...@@ -319,7 +319,7 @@ int test_missing_subscriptions() ...@@ -319,7 +319,7 @@ int test_missing_subscriptions()
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
// wait // wait
assert (zmq_poll (0, 0, 100) == 0); msleep (SETTLE_TIME);
// each subscriber should now get a message // each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment