Commit dfba19c4 authored by Christian Kamm's avatar Christian Kamm

Make pipeline/reqrep tests try tcp:// endpoints.

The inproc:// endpoints sometimes use different code paths so
testing with regular tcp:// endpoints as well can show different
issues.
parent 8fd163cf
......@@ -21,12 +21,15 @@
#include <stdlib.h>
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
void test_round_robin_out (void *ctx)
{
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (dealer, "inproc://b");
int rc = zmq_bind (dealer, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -40,10 +43,14 @@ void test_round_robin_out (void *ctx)
rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_connect (rep[i], "inproc://b");
rc = zmq_connect (rep[i], connect_address);
assert (rc == 0);
}
// Wait for connections.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
// Send N requests
for (size_t i = 0; i < N; ++i)
{
......@@ -62,14 +69,16 @@ void test_round_robin_out (void *ctx)
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
close_zero_linger (dealer);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (rep[i]);
assert (rc == 0);
close_zero_linger (rep[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_fair_queue_in (void *ctx)
......@@ -81,7 +90,7 @@ void test_fair_queue_in (void *ctx)
int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_bind (receiver, "inproc://a");
rc = zmq_bind (receiver, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -94,7 +103,7 @@ void test_fair_queue_in (void *ctx)
rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_connect (senders[i], "inproc://a");
rc = zmq_connect (senders[i], connect_address);
assert (rc == 0);
}
......@@ -111,32 +120,32 @@ void test_fair_queue_in (void *ctx)
// send N requests
for (size_t i = 0; i < N; ++i)
{
char *str = strdup("A");
str[0] += i;
s_send_seq (senders[i], str, SEQ_END);
free (str);
s_send_seq (senders[i], "B", SEQ_END);
}
// Wait for data.
rc = zmq_poll (0, 0, 50);
assert (rc == 0);
// handle N requests
for (size_t i = 0; i < N; ++i)
{
char *str = strdup("A");
str[0] += i;
s_recv_seq (receiver, str, SEQ_END);
free (str);
s_recv_seq (receiver, "B", SEQ_END);
}
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (receiver);
assert (rc == 0);
close_zero_linger (receiver);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (senders[i]);
assert (rc == 0);
close_zero_linger (senders[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_destroy_queue_on_disconnect (void *ctx)
......@@ -144,26 +153,28 @@ void test_destroy_queue_on_disconnect (void *ctx)
void *A = zmq_socket (ctx, ZMQ_DEALER);
assert (A);
int rc = zmq_bind (A, "inproc://d");
int rc = zmq_bind (A, bind_address);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_DEALER);
assert (B);
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
// Send a message in both directions
s_send_seq (A, "ABC", SEQ_END);
s_send_seq (B, "DEF", SEQ_END);
rc = zmq_disconnect (B, "inproc://d");
rc = zmq_disconnect (B, connect_address);
assert (rc == 0);
// Disconnect may take time and need command processing.
zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
// No messages should be available, sending should fail.
zmq_msg_t msg;
......@@ -178,7 +189,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (errno == EAGAIN);
// After a reconnect of B, the messages should still be gone
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
......@@ -192,10 +203,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (A);
assert (rc == 0);
close_zero_linger (A);
close_zero_linger (B);
rc = zmq_close (B);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
......@@ -225,21 +237,29 @@ int main ()
void *ctx = zmq_ctx_new ();
assert (ctx);
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
test_round_robin_out (ctx);
const char *binds[] = { "inproc://a", "tcp://*:5555" };
const char *connects[] = { "inproc://a", "tcp://localhost:5555" };
for (int i = 0; i < 2; ++i) {
bind_address = binds[i];
connect_address = connects[i];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
test_round_robin_out (ctx);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers (ctx);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers (ctx);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -21,12 +21,15 @@
#include <stdlib.h>
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
void test_push_round_robin_out (void *ctx)
{
void *push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
int rc = zmq_bind (push, "inproc://b");
int rc = zmq_bind (push, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -40,10 +43,14 @@ void test_push_round_robin_out (void *ctx)
rc = zmq_setsockopt (pulls[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_connect (pulls[i], "inproc://b");
rc = zmq_connect (pulls[i], connect_address);
assert (rc == 0);
}
// Wait for connections.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
// Send 2N messages
for (size_t i = 0; i < N; ++i)
{
......@@ -61,14 +68,16 @@ void test_push_round_robin_out (void *ctx)
s_recv_seq (pulls[i], "DEF", SEQ_END);
}
rc = zmq_close (push);
assert (rc == 0);
close_zero_linger (push);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (pulls[i]);
assert (rc == 0);
close_zero_linger (pulls[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_pull_fair_queue_in (void *ctx)
......@@ -76,7 +85,7 @@ void test_pull_fair_queue_in (void *ctx)
void *pull = zmq_socket (ctx, ZMQ_PULL);
assert (pull);
int rc = zmq_bind (pull, "inproc://a");
int rc = zmq_bind (pull, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -86,38 +95,74 @@ void test_pull_fair_queue_in (void *ctx)
pushs[i] = zmq_socket (ctx, ZMQ_PUSH);
assert (pushs[i]);
rc = zmq_connect (pushs[i], "inproc://a");
rc = zmq_connect (pushs[i], connect_address);
assert (rc == 0);
}
// Wait for connections.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
int first_half = 0;
int second_half = 0;
// Send 2N messages
for (size_t i = 0; i < N; ++i)
{
char * str = strdup("A");
char *str = strdup("A");
str[0] += i;
s_send_seq (pushs[i], str, SEQ_END);
first_half += str[0];
str[0] += N;
s_send_seq (pushs[i], str, SEQ_END);
second_half += str[0];
free (str);
}
// Expect to pull them in order
for (size_t i = 0; i < 2*N; ++i)
// Wait for data.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
// Expect to pull one from each first
for (size_t i = 0; i < N; ++i)
{
char * str = strdup("A");
str[0] += i;
s_recv_seq (pull, str, SEQ_END);
free (str);
rc = zmq_msg_recv (&msg, pull, 0);
assert (rc == 2);
const char *str = (const char *)zmq_msg_data (&msg);
first_half -= str[0];
}
assert (first_half == 0);
rc = zmq_close (pull);
// And then get the second batch
for (size_t i = 0; i < N; ++i)
{
rc = zmq_msg_recv (&msg, pull, 0);
assert (rc == 2);
const char *str = (const char *)zmq_msg_data (&msg);
second_half -= str[0];
}
assert (second_half == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0);
close_zero_linger (pull);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (pushs[i]);
assert (rc == 0);
close_zero_linger (pushs[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_push_block_on_send_no_peers (void *ctx)
......@@ -150,7 +195,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int rc = zmq_setsockopt (A, ZMQ_SNDHWM, &hwm, sizeof(hwm));
assert (rc == 0);
rc = zmq_bind (A, "inproc://d");
rc = zmq_bind (A, bind_address);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_PULL);
......@@ -159,7 +204,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_setsockopt (B, ZMQ_RCVHWM, &hwm, sizeof(hwm));
assert (rc == 0);
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
// Send two messages, one should be stuck in A's outgoing queue, the other
......@@ -172,13 +217,15 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (rc == -1);
assert (errno == EAGAIN);
rc = zmq_disconnect (B, "inproc://d");
rc = zmq_disconnect (B, connect_address);
assert (rc == 0);
// Disconnect may take time and need command processing.
zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
......@@ -195,7 +242,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (errno == EAGAIN);
// Reconnect B
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
// Still can't receive old data on B.
......@@ -214,10 +261,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (A);
assert (rc == 0);
close_zero_linger (A);
close_zero_linger (B);
rc = zmq_close (B);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
......@@ -226,22 +274,30 @@ int main ()
void *ctx = zmq_ctx_new ();
assert (ctx);
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
test_push_round_robin_out (ctx);
const char *binds[] = { "inproc://a", "tcp://*:5555" };
const char *connects[] = { "inproc://a", "tcp://localhost:5555" };
for (int i = 0; i < 2; ++i) {
bind_address = binds[i];
connect_address = connects[i];
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_pull_fair_queue_in (ctx);
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
test_push_round_robin_out (ctx);
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
test_push_block_on_send_no_peers (ctx);
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_pull_fair_queue_in (ctx);
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
test_push_block_on_send_no_peers (ctx);
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -21,6 +21,9 @@
#include <stdlib.h>
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
void test_fair_queue_in (void *ctx)
{
void *rep = zmq_socket (ctx, ZMQ_REP);
......@@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx)
int rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_bind (rep, "inproc://a");
rc = zmq_bind (rep, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -43,7 +46,7 @@ void test_fair_queue_in (void *ctx)
rc = zmq_setsockopt (reqs[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_connect (reqs[i], "inproc://a");
rc = zmq_connect (reqs[i], connect_address);
assert (rc == 0);
}
......@@ -77,14 +80,16 @@ void test_fair_queue_in (void *ctx)
free (str);
}
rc = zmq_close (rep);
assert (rc == 0);
close_zero_linger (rep);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (reqs[i]);
assert (rc == 0);
close_zero_linger (reqs[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_envelope (void *ctx)
......@@ -92,13 +97,13 @@ void test_envelope (void *ctx)
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
int rc = zmq_bind (rep, "inproc://b");
int rc = zmq_bind (rep, bind_address);
assert (rc == 0);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_connect (dealer, "inproc://b");
rc = zmq_connect (dealer, connect_address);
assert (rc == 0);
// minimal envelope
......@@ -113,10 +118,11 @@ void test_envelope (void *ctx)
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END);
rc = zmq_close (rep);
assert (rc == 0);
close_zero_linger (rep);
close_zero_linger (dealer);
rc = zmq_close (dealer);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
......@@ -125,17 +131,25 @@ int main ()
void *ctx = zmq_ctx_new ();
assert (ctx);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
test_envelope (ctx);
const char *binds[] = { "inproc://a", "tcp://*:5555" };
const char *connects[] = { "inproc://a", "tcp://localhost:5555" };
for (int i = 0; i < 2; ++i) {
bind_address = binds[i];
connect_address = connects[i];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
test_envelope (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -20,12 +20,15 @@
#include <stdio.h>
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
void test_round_robin_out (void *ctx)
{
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
int rc = zmq_bind (req, "inproc://b");
int rc = zmq_bind (req, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -39,7 +42,7 @@ void test_round_robin_out (void *ctx)
rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_connect (rep[i], "inproc://b");
rc = zmq_connect (rep[i], connect_address);
assert (rc == 0);
}
......@@ -52,14 +55,16 @@ void test_round_robin_out (void *ctx)
s_recv_seq (req, "DEF", SEQ_END);
}
rc = zmq_close (req);
assert (rc == 0);
close_zero_linger (req);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (rep[i]);
assert (rc == 0);
close_zero_linger (rep[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_req_only_listens_to_current_peer (void *ctx)
......@@ -70,7 +75,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
assert (rc == 0);
rc = zmq_bind (req, "inproc://c");
rc = zmq_bind (req, bind_address);
assert (rc == 0);
const size_t N = 3;
......@@ -88,7 +93,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
rc = zmq_setsockopt (router[i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled));
assert (rc == 0);
rc = zmq_connect (router[i], "inproc://c");
rc = zmq_connect (router[i], connect_address);
assert (rc == 0);
}
......@@ -111,14 +116,16 @@ void test_req_only_listens_to_current_peer (void *ctx)
s_recv_seq (req, "GOOD", SEQ_END);
}
rc = zmq_close (req);
assert (rc == 0);
close_zero_linger (req);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (router[i]);
assert (rc == 0);
close_zero_linger (router[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_req_message_format (void *ctx)
......@@ -129,10 +136,10 @@ void test_req_message_format (void *ctx)
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_bind (req, "inproc://a");
int rc = zmq_bind (req, bind_address);
assert (rc == 0);
rc = zmq_connect (router, "inproc://a");
rc = zmq_connect (router, connect_address);
assert (rc == 0);
// Send a multi-part request.
......@@ -172,10 +179,11 @@ void test_req_message_format (void *ctx)
rc = zmq_msg_close (&peer_id_msg);
assert (rc == 0);
rc = zmq_close (req);
assert (rc == 0);
close_zero_linger (req);
close_zero_linger (router);
rc = zmq_close (router);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
......@@ -205,23 +213,31 @@ int main ()
void *ctx = zmq_ctx_new ();
assert (ctx);
// SHALL route outgoing messages to connected peers using a round-robin
// strategy.
test_round_robin_out (ctx);
const char *binds[] = { "inproc://a", "tcp://*:5555" };
const char *connects[] = { "inproc://a", "tcp://localhost:5555" };
// The request and reply messages SHALL have this format on the wire:
// * A delimiter, consisting of an empty frame, added by the REQ socket.
// * One or more data frames, comprising the message visible to the
// application.
test_req_message_format (ctx);
for (int i = 0; i < 2; ++i) {
bind_address = binds[i];
connect_address = connects[i];
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers (ctx);
// SHALL route outgoing messages to connected peers using a round-robin
// strategy.
test_round_robin_out (ctx);
// SHALL accept an incoming message only from the last peer that it sent a
// request to.
// SHALL discard silently any messages received from other peers.
test_req_only_listens_to_current_peer (ctx);
// The request and reply messages SHALL have this format on the wire:
// * A delimiter, consisting of an empty frame, added by the REQ socket.
// * One or more data frames, comprising the message visible to the
// application.
test_req_message_format (ctx);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers (ctx);
// SHALL accept an incoming message only from the last peer that it sent a
// request to.
// SHALL discard silently any messages received from other peers.
test_req_only_listens_to_current_peer (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -21,6 +21,9 @@
#include <stdlib.h>
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
void test_fair_queue_in (void *ctx)
{
void *receiver = zmq_socket (ctx, ZMQ_ROUTER);
......@@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx)
int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
assert (rc == 0);
rc = zmq_bind (receiver, "inproc://a");
rc = zmq_bind (receiver, bind_address);
assert (rc == 0);
const size_t N = 5;
......@@ -49,7 +52,7 @@ void test_fair_queue_in (void *ctx)
assert (rc == 0);
free (str);
rc = zmq_connect (senders[i], "inproc://a");
rc = zmq_connect (senders[i], connect_address);
assert (rc == 0);
}
......@@ -63,32 +66,43 @@ void test_fair_queue_in (void *ctx)
s_send_seq (senders[0], "M", SEQ_END);
s_recv_seq (receiver, "A", "M", SEQ_END);
int sum = 0;
// send N requests
for (size_t i = 0; i < N; ++i)
{
s_send_seq (senders[i], "M", SEQ_END);
sum += 'A' + i;
}
assert (sum == N*'A' + N*(N-1)/2);
// handle N requests
for (size_t i = 0; i < N; ++i)
{
char *str = strdup("A");
str[0] += i;
s_recv_seq (receiver, str, "M", SEQ_END);
free (str);
rc = zmq_msg_recv (&msg, receiver, 0);
assert (rc == 2);
const char *id = (const char *)zmq_msg_data (&msg);
sum -= id[0];
s_recv_seq (receiver, "M", SEQ_END);
}
assert (sum == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (receiver);
assert (rc == 0);
close_zero_linger (receiver);
for (size_t i = 0; i < N; ++i)
{
rc = zmq_close (senders[i]);
assert (rc == 0);
close_zero_linger (senders[i]);
}
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
void test_destroy_queue_on_disconnect (void *ctx)
......@@ -100,7 +114,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int rc = zmq_setsockopt (A, ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled));
assert (rc == 0);
rc = zmq_bind (A, "inproc://d");
rc = zmq_bind (A, bind_address);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_DEALER);
......@@ -109,20 +123,26 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2);
assert (rc == 0);
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
// Wait for connection.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
// Send a message in both directions
s_send_seq (A, "B", "ABC", SEQ_END);
s_send_seq (B, "DEF", SEQ_END);
rc = zmq_disconnect (B, "inproc://d");
rc = zmq_disconnect (B, connect_address);
assert (rc == 0);
// Disconnect may take time and need command processing.
zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
// No messages should be available, sending should fail.
zmq_msg_t msg;
......@@ -137,7 +157,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert (errno == EAGAIN);
// After a reconnect of B, the messages should still be gone
rc = zmq_connect (B, "inproc://d");
rc = zmq_connect (B, connect_address);
assert (rc == 0);
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
......@@ -151,10 +171,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (A);
assert (rc == 0);
close_zero_linger (A);
close_zero_linger (B);
rc = zmq_close (B);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
}
......@@ -164,14 +185,22 @@ int main ()
void *ctx = zmq_ctx_new ();
assert (ctx);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
const char *binds[] = { "inproc://a", "tcp://*:5555" };
const char *connects[] = { "inproc://a", "tcp://localhost:5555" };
for (int i = 0; i < 2; ++i) {
bind_address = binds[i];
connect_address = connects[i];
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in (ctx);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect (ctx);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -186,4 +186,15 @@ void s_recv_seq (void *socket, ...)
zmq_msg_close (&msg);
}
// Sets a zero linger period on a socket and closes it.
void close_zero_linger (void *socket)
{
int linger = 0;
int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof(linger));
assert (rc == 0);
rc = zmq_close (socket);
assert (rc == 0);
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment