Commit 4f49e742 authored by Francesco Montorsi's avatar Francesco Montorsi

Remove race condition from XPUB/SUB test in test_blocking()

parent 9f962ea0
...@@ -95,12 +95,21 @@ int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint) ...@@ -95,12 +95,21 @@ int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint)
return recv_count; return recv_count;
} }
int receive (void *socket_) int receive (void *socket_, int *is_termination)
{ {
int recv_count = 0; int recv_count = 0;
*is_termination = 0;
// Now receive all sent messages // Now receive all sent messages
while (0 == zmq_recv (socket_, NULL, 0, 0)) { char buffer[255];
int len;
while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) {
++recv_count; ++recv_count;
if (len == 3 && strncmp (buffer, "end", len) == 0) {
*is_termination = 1;
return recv_count;
}
} }
return recv_count; return recv_count;
...@@ -141,19 +150,36 @@ int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint) ...@@ -141,19 +150,36 @@ int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint)
// Send until we block // Send until we block
int send_count = 0; int send_count = 0;
int recv_count = 0; int recv_count = 0;
int blocked_count = 0;
int is_termination = 0;
while (send_count < msg_cnt_) { while (send_count < msg_cnt_) {
const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT); const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
if (rc == 0) { if (rc == 0) {
++send_count; ++send_count;
} else if (-1 == rc) { } else if (-1 == rc) {
// if the PUB socket blocks due to HWM, errno should be EAGAIN: // if the PUB socket blocks due to HWM, errno should be EAGAIN:
blocked_count++;
TEST_ASSERT_EQUAL_INT (EAGAIN, errno); TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
recv_count += receive (sub_socket); recv_count += receive (sub_socket, &is_termination);
} }
} }
msleep (2 * SETTLE_TIME); // required for TCP transport // if send_hwm_ < msg_cnt_, we should block at least once:
recv_count += receive (sub_socket); TEST_ASSERT (blocked_count > 0);
// dequeue SUB socket again, to make sure XPUB has space to send the termination message
recv_count += receive (sub_socket, &is_termination);
// send termination message
send_string_expect_success (pub_socket, "end", 0);
// now block on the SUB side till we get the termination message
while (is_termination == 0)
recv_count += receive (sub_socket, &is_termination);
// remove termination message from the count:
recv_count--;
TEST_ASSERT_EQUAL_INT (send_count, recv_count); TEST_ASSERT_EQUAL_INT (send_count, recv_count);
// Clean up // Clean up
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment