Unverified Commit 02019d9f authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3254 from f18m/hwm_test_fix

Improve test reliability by: a) using XPUB in place of PUB to ensure …
parents b933cb9d 72418e9d
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
// //
// Topology: // Topology:
// //
// PUB SUB // XPUB SUB
// | | // | |
// \-----> XSUB -> XPUB -----/ // \-----> XSUB -> XPUB -----/
// ^^^^^^^^^^^^^^ // ^^^^^^^^^^^^^^
...@@ -46,10 +46,10 @@ ...@@ -46,10 +46,10 @@
// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow // Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
// at receiving messages. // at receiving messages.
// This scenario simulates what happens when a SUB is slower than // This scenario simulates what happens when a SUB is slower than
// its PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then // its (X)PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
// also the PUB socket will block. // also the (X)PUB socket will block.
// The result is that 2*HWM messages will be sent before the PUB blocks. // The exact number of the messages that go through before (X)PUB blocks depends
// // on ZeroMQ internals and how the OS will schedule the different threads.
// In the meanwhile asking statistics to the Proxy must NOT be blocking. // In the meanwhile asking statistics to the Proxy must NOT be blocking.
// //
...@@ -57,6 +57,7 @@ ...@@ -57,6 +57,7 @@
#define HWM 10 #define HWM 10
#define NUM_BYTES_PER_MSG 50000 #define NUM_BYTES_PER_MSG 50000
typedef struct typedef struct
{ {
void *context; void *context;
...@@ -67,21 +68,6 @@ typedef struct ...@@ -67,21 +68,6 @@ typedef struct
bool subscriber_received_all; bool subscriber_received_all;
} proxy_hwm_cfg_t; } proxy_hwm_cfg_t;
static void lower_tcp_buff (void *sock_)
{
int sndBuff;
size_t sndBuffSz = sizeof sndBuff;
int rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
assert (rc == 0);
int newBuff = 1000;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sock_, ZMQ_SNDBUF, &newBuff, sizeof (newBuff)));
rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
assert (rc == 0);
}
static void lower_hwm (void *skt) static void lower_hwm (void *skt)
{ {
int send_hwm_ = HWM; int send_hwm_ = HWM;
...@@ -96,7 +82,7 @@ static void publisher_thread_main (void *pvoid) ...@@ -96,7 +82,7 @@ static void publisher_thread_main (void *pvoid)
{ {
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
void *pubsocket = zmq_socket (cfg->context, ZMQ_PUB); void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
assert (pubsocket); assert (pubsocket);
lower_hwm (pubsocket); lower_hwm (pubsocket);
...@@ -107,7 +93,10 @@ static void publisher_thread_main (void *pvoid) ...@@ -107,7 +93,10 @@ static void publisher_thread_main (void *pvoid)
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval))); zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
msleep (SETTLE_TIME); // Wait before starting TX operations till 1 subscriber has subscribed
// (in this test there's 1 subscriber only)
const char subscription_to_all_topics[] = {1, 0};
recv_string_expect_success (pubsocket, subscription_to_all_topics, 0);
uint64_t send_count = 0; uint64_t send_count = 0;
while (true) { while (true) {
...@@ -129,8 +118,16 @@ static void publisher_thread_main (void *pvoid) ...@@ -129,8 +118,16 @@ static void publisher_thread_main (void *pvoid)
} }
// VERIFY EXPECTED RESULTS // VERIFY EXPECTED RESULTS
// EXPLANATION FOR TX TO BE CONSIDERED SUCCESSFUL:
TEST_ASSERT (4 * HWM == send_count || 2 * HWM == send_count); // this test has 3 threads doing I/O across 2 queues. Depending on the scheduling,
// it might happen that 20, 30 or 40 messages go through before the pub blocks.
// That's because the receiver thread gets kicked once every (hwm_ + 1) / 2 sent
// messages (search for zeromq sources compute_lwm function).
// So depending on the scheduling of the second thread, the publisher might get one,
// two or three more batches in. The ceiling is 40 as there's 2 queues.
//
assert (4 * HWM == send_count || 3 * HWM == send_count
|| 2 * HWM == send_count);
// CLEANUP // CLEANUP
...@@ -151,7 +148,6 @@ static void subscriber_thread_main (void *pvoid) ...@@ -151,7 +148,6 @@ static void subscriber_thread_main (void *pvoid)
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint)); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint));
lower_tcp_buff (subsocket);
// receive all sent messages // receive all sent messages
uint64_t rxsuccess = 0; uint64_t rxsuccess = 0;
...@@ -179,8 +175,12 @@ static void subscriber_thread_main (void *pvoid) ...@@ -179,8 +175,12 @@ static void subscriber_thread_main (void *pvoid)
// VERIFY EXPECTED RESULTS // VERIFY EXPECTED RESULTS
// EXPLANATION FOR RX TO BE CONSIDERED SUCCESSFUL:
// see publisher thread why we have 3 possible outcomes as number of RX messages
assert (4 * HWM == rxsuccess || 3 * HWM == rxsuccess
|| 2 * HWM == rxsuccess);
TEST_ASSERT (4 * HWM == rxsuccess || 2 * HWM == rxsuccess);
// INFORM THAT WE COMPLETED: // INFORM THAT WE COMPLETED:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment