Unverified Commit 12005bd9 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3228 from sigiesec/migrate-tests-to-unity

Migrate further tests to unity
parents 9f0b83a1 1a55100d
......@@ -455,7 +455,8 @@ external_unity_libunity_a_SOURCES = external/unity/unity.c \
external/unity/unity_internals.h
tests_test_ancillaries_SOURCES = tests/test_ancillaries.cpp
tests_test_ancillaries_LDADD = src/libzmq.la
tests_test_ancillaries_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_ancillaries_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
......@@ -468,7 +469,8 @@ tests_test_pair_inproc_LDADD = src/libzmq.la
tests_test_pair_tcp_SOURCES = \
tests/test_pair_tcp.cpp \
tests/testutil.hpp
tests_test_pair_tcp_LDADD = src/libzmq.la
tests_test_pair_tcp_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_pair_tcp_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_reqrep_inproc_SOURCES = \
tests/test_reqrep_inproc.cpp \
......@@ -542,10 +544,12 @@ tests_test_probe_router_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_probe_router_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_stream_SOURCES = tests/test_stream.cpp
tests_test_stream_LDADD = src/libzmq.la
tests_test_stream_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_stream_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_stream_empty_SOURCES = tests/test_stream_empty.cpp
tests_test_stream_empty_LDADD = src/libzmq.la
tests_test_stream_empty_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_stream_empty_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp
tests_test_stream_timeout_LDADD = src/libzmq.la
......@@ -557,7 +561,8 @@ tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp
tests_test_disconnect_inproc_LDADD = src/libzmq.la
tests_test_unbind_wildcard_SOURCES = tests/test_unbind_wildcard.cpp
tests_test_unbind_wildcard_LDADD = src/libzmq.la
tests_test_unbind_wildcard_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_unbind_wildcard_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_ctx_options_SOURCES = tests/test_ctx_options.cpp
tests_test_ctx_options_LDADD = src/libzmq.la
......@@ -596,10 +601,12 @@ tests_test_spec_router_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_spec_router_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_spec_pushpull_SOURCES = tests/test_spec_pushpull.cpp
tests_test_spec_pushpull_LDADD = src/libzmq.la
tests_test_spec_pushpull_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_spec_pushpull_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_req_correlate_SOURCES = tests/test_req_correlate.cpp
tests_test_req_correlate_LDADD = src/libzmq.la
tests_test_req_correlate_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_req_correlate_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_req_relaxed_SOURCES = tests/test_req_relaxed.cpp
tests_test_req_relaxed_LDADD = src/libzmq.la
......@@ -645,14 +652,16 @@ tests_test_metadata_SOURCES = tests/test_metadata.cpp
tests_test_metadata_LDADD = src/libzmq.la
tests_test_capabilities_SOURCES = tests/test_capabilities.cpp
tests_test_capabilities_LDADD = src/libzmq.la
tests_test_capabilities_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_capabilities_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_xpub_nodrop_SOURCES = tests/test_xpub_nodrop.cpp
tests_test_xpub_nodrop_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_nodrop_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_xpub_manual_SOURCES = tests/test_xpub_manual.cpp
tests_test_xpub_manual_LDADD = src/libzmq.la
tests_test_xpub_manual_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_manual_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp
tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
......@@ -688,7 +697,8 @@ tests_test_bind_after_connect_tcp_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_bind_after_connect_tcp_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_base85_SOURCES = tests/test_base85.cpp
tests_test_base85_LDADD = src/libzmq.la
tests_test_base85_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_base85_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_sodium_SOURCES = tests/test_sodium.cpp
tests_test_sodium_LDADD = src/libzmq.la
......
......@@ -35,16 +35,37 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
void setUp ()
{
}
void tearDown ()
{
}
void test_version ()
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
assert (major == ZMQ_VERSION_MAJOR && minor == ZMQ_VERSION_MINOR
&& patch == ZMQ_VERSION_PATCH);
TEST_ASSERT_EQUAL_INT (ZMQ_VERSION_MAJOR, major);
TEST_ASSERT_EQUAL_INT (ZMQ_VERSION_MINOR, minor);
TEST_ASSERT_EQUAL_INT (ZMQ_VERSION_PATCH, patch);
}
void test_strerrror ()
{
assert (zmq_strerror (EINVAL));
}
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_version);
RUN_TEST (test_strerrror);
return UNITY_END ();
}
......@@ -28,6 +28,15 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
}
void tearDown ()
{
}
// Test vector: rfc.zeromq.org/spec:32/Z85
void test__zmq_z85_encode__valid__success ()
......@@ -40,17 +49,17 @@ void test__zmq_z85_encode__valid__success ()
char out_encoded[length + 1] = {0};
errno = 0;
assert (zmq_z85_encode (out_encoded, decoded, size) != NULL);
assert (streq (out_encoded, expected));
assert (zmq_errno () == 0);
TEST_ASSERT_NOT_NULL (zmq_z85_encode (out_encoded, decoded, size));
TEST_ASSERT_EQUAL_STRING (expected, out_encoded);
TEST_ASSERT_EQUAL_INT (0, zmq_errno ());
}
// Buffer length must be evenly divisible by 4 or must fail with EINVAL.
void test__zmq_z85_encode__invalid__failure (size_t size_)
{
errno = 0;
assert (zmq_z85_encode (NULL, NULL, size_) == NULL);
assert (zmq_errno () == EINVAL);
TEST_ASSERT_NULL (zmq_z85_encode (NULL, NULL, size_));
TEST_ASSERT_EQUAL_INT (EINVAL, zmq_errno ());
}
// Test vector: rfc.zeromq.org/spec:32/Z85
......@@ -63,9 +72,9 @@ void test__zmq_z85_decode__valid__success ()
uint8_t out_decoded[size] = {0};
errno = 0;
assert (zmq_z85_decode (out_decoded, encoded) != NULL);
assert (zmq_errno () == 0);
assert (memcmp (out_decoded, expected, size) == 0);
TEST_ASSERT_NOT_NULL (zmq_z85_decode (out_decoded, encoded));
TEST_ASSERT_EQUAL_INT (0, zmq_errno ());
TEST_ASSERT_EQUAL_UINT8_ARRAY (expected, out_decoded, size);
}
// Invalid input data must fail with EINVAL.
......@@ -74,8 +83,8 @@ void test__zmq_z85_decode__invalid__failure (const char (&encoded_)[SIZE])
{
uint8_t decoded[SIZE * 4 / 5 + 1];
errno = 0;
assert (zmq_z85_decode (decoded, encoded_) == NULL);
assert (zmq_errno () == EINVAL);
TEST_ASSERT_NULL (zmq_z85_decode (decoded, encoded_));
TEST_ASSERT_EQUAL_INT (EINVAL, zmq_errno ());
}
......@@ -86,14 +95,13 @@ void test__zmq_z85_encode__zmq_z85_decode__roundtrip (
{
char test_data_z85[SIZE * 5 / 4 + 1];
char *res1 = zmq_z85_encode (test_data_z85, test_data_, SIZE);
assert (res1 != NULL);
TEST_ASSERT_NOT_NULL (res1);
uint8_t test_data_decoded[SIZE];
uint8_t *res2 = zmq_z85_decode (test_data_decoded, test_data_z85);
assert (res2 != NULL);
TEST_ASSERT_NOT_NULL (res2);
int res3 = memcmp (test_data_, test_data_decoded, SIZE);
assert (res3 == 0);
TEST_ASSERT_EQUAL_UINT8_ARRAY (test_data_, test_data_decoded, SIZE);
}
// call zmq_z85_encode, then zmq_z85_decode, and compare the results with the original
......@@ -104,61 +112,92 @@ void test__zmq_z85_decode__zmq_z85_encode__roundtrip (
const size_t decoded_size = (SIZE - 1) * 4 / 5;
uint8_t test_data_decoded[decoded_size];
uint8_t *res1 = zmq_z85_decode (test_data_decoded, test_data_);
assert (res1 != NULL);
TEST_ASSERT_NOT_NULL (res1);
char test_data_z85[SIZE];
char *res2 =
zmq_z85_encode (test_data_z85, test_data_decoded, decoded_size);
assert (res2 != NULL);
TEST_ASSERT_NOT_NULL (res2);
int res3 = memcmp (test_data_, test_data_z85, SIZE);
assert (res3 == 0);
TEST_ASSERT_EQUAL_UINT8_ARRAY (test_data_, test_data_z85, SIZE);
}
int main (void)
{
test__zmq_z85_encode__valid__success ();
test__zmq_z85_encode__invalid__failure (1);
test__zmq_z85_encode__invalid__failure (42);
test__zmq_z85_decode__valid__success ();
// String length must be evenly divisible by 5 or must fail with EINVAL.
test__zmq_z85_decode__invalid__failure ("01234567");
test__zmq_z85_decode__invalid__failure ("0");
// decode invalid data with the maximum representable value
test__zmq_z85_decode__invalid__failure ("#####");
// decode invalid data with the minimum value beyond the limit
// "%nSc0" is 0xffffffff
test__zmq_z85_decode__invalid__failure ("%nSc1");
// decode invalid data with an invalid character in the range of valid
// characters
test__zmq_z85_decode__invalid__failure ("####\0047");
// decode invalid data with an invalid character just below the range of valid
// characters
test__zmq_z85_decode__invalid__failure ("####\0200");
// decode invalid data with an invalid character just above the range of valid
// characters
test__zmq_z85_decode__invalid__failure ("####\0037");
// round-trip encoding and decoding with minimum value
{
const uint8_t test_data[] = {0x00, 0x00, 0x00, 0x00};
test__zmq_z85_encode__zmq_z85_decode__roundtrip (test_data);
}
// round-trip encoding and decoding with maximum value
{
const uint8_t test_data[] = {0xff, 0xff, 0xff, 0xff};
test__zmq_z85_encode__zmq_z85_decode__roundtrip (test_data);
#define def_test__zmq_z85_basename(basename, name, param) \
void test__zmq_z85_##basename##_##name () \
{ \
test__zmq_z85_##basename (param); \
}
test__zmq_z85_decode__zmq_z85_encode__roundtrip (
"r^/rM9M=rMToK)63O8dCvd9D<PY<7iGlC+{BiSnG");
#define def_test__zmq_z85_encode__invalid__failure(name, param) \
def_test__zmq_z85_basename (encode__invalid__failure, name, param)
def_test__zmq_z85_encode__invalid__failure (1, 1)
def_test__zmq_z85_encode__invalid__failure (42, 42)
#define def_test__zmq_z85_decode__invalid__failure(name, param) \
def_test__zmq_z85_basename (decode__invalid__failure, name, param)
// String length must be evenly divisible by 5 or must fail with EINVAL.
def_test__zmq_z85_decode__invalid__failure (indivisble_by_5_multiple_chars,
"01234567")
def_test__zmq_z85_decode__invalid__failure (indivisble_by_5_one_char, "0")
return 0;
// decode invalid data with the maximum representable value
def_test__zmq_z85_decode__invalid__failure (max, "#####")
// decode invalid data with the minimum value beyond the limit
// "%nSc0" is 0xffffffff
def_test__zmq_z85_decode__invalid__failure (above_limit, "%nSc1")
// decode invalid data with an invalid character in the range of valid
// characters
def_test__zmq_z85_decode__invalid__failure (char_within, "####\0047")
// decode invalid data with an invalid character just below the range of valid
// characters
def_test__zmq_z85_decode__invalid__failure (char_adjacent_below, "####\0200")
// decode invalid data with an invalid character just above the range of valid
// characters
def_test__zmq_z85_decode__invalid__failure (char_adjacent_above, "####\0037")
#define def_test__encode__zmq_z85_decode__roundtrip(name, param) \
def_test__zmq_z85_basename (encode__zmq_z85_decode__roundtrip, name, param)
const uint8_t test_data_min[] = {0x00, 0x00, 0x00, 0x00};
const uint8_t test_data_max[] = {0xff, 0xff, 0xff, 0xff};
def_test__encode__zmq_z85_decode__roundtrip (min, test_data_min)
def_test__encode__zmq_z85_decode__roundtrip (max, test_data_max)
#define def_test__decode__zmq_z85_encode__roundtrip(name, param) \
def_test__zmq_z85_basename (decode__zmq_z85_encode__roundtrip, name, param)
const char test_data_regular[] = "r^/rM9M=rMToK)63O8dCvd9D<PY<7iGlC+{BiSnG";
def_test__decode__zmq_z85_encode__roundtrip (regular, test_data_regular)
int main ()
{
UNITY_BEGIN ();
RUN_TEST (test__zmq_z85_encode__valid__success);
RUN_TEST (test__zmq_z85_encode__invalid__failure_1);
RUN_TEST (test__zmq_z85_encode__invalid__failure_42);
RUN_TEST (test__zmq_z85_decode__valid__success);
RUN_TEST (
test__zmq_z85_decode__invalid__failure_indivisble_by_5_multiple_chars);
RUN_TEST (test__zmq_z85_decode__invalid__failure_indivisble_by_5_one_char);
RUN_TEST (test__zmq_z85_decode__invalid__failure_max);
RUN_TEST (test__zmq_z85_decode__invalid__failure_above_limit);
RUN_TEST (test__zmq_z85_decode__invalid__failure_char_within);
RUN_TEST (test__zmq_z85_decode__invalid__failure_char_adjacent_below);
RUN_TEST (test__zmq_z85_decode__invalid__failure_char_adjacent_above);
RUN_TEST (test__zmq_z85_encode__zmq_z85_decode__roundtrip_min);
RUN_TEST (test__zmq_z85_encode__zmq_z85_decode__roundtrip_max);
RUN_TEST (test__zmq_z85_decode__zmq_z85_encode__roundtrip_regular);
return UNITY_END ();
}
......@@ -28,55 +28,72 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
void setUp ()
{
}
void tearDown ()
{
}
void test_capabilities ()
{
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
assert (zmq_has ("ipc"));
TEST_ASSERT_TRUE (zmq_has ("ipc"));
#else
assert (!zmq_has ("ipc"));
TEST_ASSERT_TRUE (!zmq_has ("ipc"));
#endif
#if defined(ZMQ_HAVE_OPENPGM)
assert (zmq_has ("pgm"));
TEST_ASSERT_TRUE (zmq_has ("pgm"));
#else
assert (!zmq_has ("pgm"));
TEST_ASSERT_TRUE (!zmq_has ("pgm"));
#endif
#if defined(ZMQ_HAVE_TIPC)
assert (zmq_has ("tipc"));
TEST_ASSERT_TRUE (zmq_has ("tipc"));
#else
assert (!zmq_has ("tipc"));
TEST_ASSERT_TRUE (!zmq_has ("tipc"));
#endif
#if defined(ZMQ_HAVE_NORM)
assert (zmq_has ("norm"));
TEST_ASSERT_TRUE (zmq_has ("norm"));
#else
assert (!zmq_has ("norm"));
TEST_ASSERT_TRUE (!zmq_has ("norm"));
#endif
#if defined(ZMQ_HAVE_CURVE)
assert (zmq_has ("curve"));
TEST_ASSERT_TRUE (zmq_has ("curve"));
#else
assert (!zmq_has ("curve"));
TEST_ASSERT_TRUE (!zmq_has ("curve"));
#endif
#if defined(HAVE_LIBGSSAPI_KRB5)
assert (zmq_has ("gssapi"));
TEST_ASSERT_TRUE (zmq_has ("gssapi"));
#else
assert (!zmq_has ("gssapi"));
TEST_ASSERT_TRUE (!zmq_has ("gssapi"));
#endif
#if defined(ZMQ_HAVE_VMCI)
assert (zmq_has ("vmci"));
TEST_ASSERT_TRUE (zmq_has ("vmci"));
#else
assert (!zmq_has ("vmci"));
TEST_ASSERT_TRUE (!zmq_has ("vmci"));
#endif
#if defined(ZMQ_BUILD_DRAFT_API)
assert (zmq_has ("draft"));
TEST_ASSERT_TRUE (zmq_has ("draft"));
#else
assert (!zmq_has ("draft"));
TEST_ASSERT_TRUE (!zmq_has ("draft"));
#endif
return 0;
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_capabilities);
return UNITY_END ();
}
......@@ -28,6 +28,18 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
typedef void (*extra_func_t) (void *socket_);
......@@ -43,50 +55,47 @@ void set_sockopt_fastpath (void *socket)
void test_pair_tcp (extra_func_t extra_func_ = NULL)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
void *sb = test_context_socket (ZMQ_PAIR);
if (extra_func_)
extra_func_ (sb);
int rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
char my_endpoint[MAX_SOCKET_STRING];
bind_loopback_ipv4 (sb, my_endpoint, sizeof my_endpoint);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
void *sc = test_context_socket (ZMQ_PAIR);
if (extra_func_)
extra_func_ (sc);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, my_endpoint));
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
rc = zmq_close (sb);
assert (rc == 0);
void test_pair_tcp_regular ()
{
test_pair_tcp ();
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
#ifdef ZMQ_BUILD_DRAFT
void test_pair_tcp_fastpath ()
{
test_pair_tcp (set_sockopt_fastpath);
}
#endif
int main (void)
int main ()
{
setup_test_environment ();
test_pair_tcp ();
UNITY_BEGIN ();
RUN_TEST (test_pair_tcp_regular);
#ifdef ZMQ_BUILD_DRAFT
test_pair_tcp (set_sockopt_fastpath);
RUN_TEST (test_pair_tcp_fastpath);
#endif
return 0;
return UNITY_END ();
}
......@@ -28,36 +28,35 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
void setUp ()
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
setup_test_context ();
}
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
void tearDown ()
{
teardown_test_context ();
}
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
void test_req_correlate ()
{
void *req = test_context_socket (ZMQ_REQ);
void *router = test_context_socket (ZMQ_ROUTER);
int enabled = 1;
int rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)));
int rcvtimeo = 100;
rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int)));
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
char my_endpoint[MAX_SOCKET_STRING];
bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint);
rc = zmq_connect (req, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (req, my_endpoint));
// Send a multi-part request.
s_send_seq (req, "ABC", "DEF", SEQ_END);
......@@ -66,33 +65,31 @@ int main (void)
zmq_msg_init (&msg);
// Receive peer routing id
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
assert (zmq_msg_size (&msg) > 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, router, 0));
TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&msg));
zmq_msg_t peer_id_msg;
zmq_msg_init (&peer_id_msg);
zmq_msg_copy (&peer_id_msg, &msg);
int more = 0;
size_t more_size = sizeof (more);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
assert (more);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size));
TEST_ASSERT_TRUE (more);
// Receive request id 1
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, router, 0));
assert (zmq_msg_size (&msg) == sizeof (uint32_t));
uint32_t req_id = *static_cast<uint32_t *> (zmq_msg_data (&msg));
const uint32_t req_id = *static_cast<uint32_t *> (zmq_msg_data (&msg));
zmq_msg_t req_id_msg;
zmq_msg_init (&req_id_msg);
zmq_msg_copy (&req_id_msg, &msg);
more = 0;
more_size = sizeof (more);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
assert (more);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size));
TEST_ASSERT_TRUE (more);
// Receive the rest.
s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
......@@ -101,40 +98,35 @@ int main (void)
// Send back a bad reply: wrong req id, 0, data
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&msg, router, ZMQ_SNDMORE));
zmq_msg_init_data (&msg, &bad_req_id, sizeof (uint32_t), NULL, NULL);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&msg, router, ZMQ_SNDMORE));
s_send_seq (router, 0, "DATA", SEQ_END);
// Send back a good reply: good req id, 0, data
zmq_msg_copy (&msg, &peer_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&msg, router, ZMQ_SNDMORE));
zmq_msg_copy (&msg, &req_id_msg);
rc = zmq_msg_send (&msg, router, ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&msg, router, ZMQ_SNDMORE));
s_send_seq (router, 0, "GHI", SEQ_END);
// Receive reply. If bad reply got through, we wouldn't see
// this particular data.
s_recv_seq (req, "GHI", SEQ_END);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_msg_close (&peer_id_msg);
assert (rc == 0);
rc = zmq_msg_close (&req_id_msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_id_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&req_id_msg));
close_zero_linger (req);
close_zero_linger (router);
test_context_socket_close_zero_linger (req);
test_context_socket_close_zero_linger (router);
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_req_correlate);
return UNITY_END ();
}
......@@ -28,33 +28,40 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
const char *bind_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_push_round_robin_out (void *ctx_)
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
void test_push_round_robin_out (const char *bind_address_)
{
void *push = zmq_socket (ctx_, ZMQ_PUSH);
assert (push);
void *push = test_context_socket (ZMQ_PUSH);
int rc = zmq_bind (push, bind_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len));
const size_t services = 5;
void *pulls[services];
for (size_t peer = 0; peer < services; ++peer) {
pulls[peer] = zmq_socket (ctx_, ZMQ_PULL);
assert (pulls[peer]);
pulls[peer] = test_context_socket (ZMQ_PULL);
int timeout = 250;
rc = zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (pulls[peer], connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[peer], connect_address));
}
// Wait for connections.
......@@ -72,34 +79,29 @@ void test_push_round_robin_out (void *ctx_)
s_recv_seq (pulls[peer], "DEF", SEQ_END);
}
close_zero_linger (push);
test_context_socket_close_zero_linger (push);
for (size_t peer = 0; peer < services; ++peer)
close_zero_linger (pulls[peer]);
// Wait for disconnects.
msleep (SETTLE_TIME);
test_context_socket_close_zero_linger (pulls[peer]);
}
void test_pull_fair_queue_in (void *ctx_)
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
void test_pull_fair_queue_in (const char *bind_address_)
{
void *pull = zmq_socket (ctx_, ZMQ_PULL);
assert (pull);
void *pull = test_context_socket (ZMQ_PULL);
int rc = zmq_bind (pull, bind_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pull, bind_address_));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len));
const unsigned char services = 5;
void *pushs[services];
for (unsigned char peer = 0; peer < services; ++peer) {
pushs[peer] = zmq_socket (ctx_, ZMQ_PUSH);
assert (pushs[peer]);
pushs[peer] = test_context_socket (ZMQ_PUSH);
rc = zmq_connect (pushs[peer], connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pushs[peer], connect_address));
}
// Wait for connections.
......@@ -127,83 +129,72 @@ void test_pull_fair_queue_in (void *ctx_)
msleep (SETTLE_TIME);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
// Expect to pull one from each first
for (size_t peer = 0; peer < services; ++peer) {
rc = zmq_msg_recv (&msg, pull, 0);
assert (rc == 2);
TEST_ASSERT_EQUAL_INT (
2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
const char *str = (const char *) zmq_msg_data (&msg);
first_half -= str[0];
}
assert (first_half == 0);
TEST_ASSERT_EQUAL_INT (0, first_half);
// And then get the second batch
for (size_t peer = 0; peer < services; ++peer) {
rc = zmq_msg_recv (&msg, pull, 0);
assert (rc == 2);
TEST_ASSERT_EQUAL_INT (
2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
const char *str = (const char *) zmq_msg_data (&msg);
second_half -= str[0];
}
assert (second_half == 0);
TEST_ASSERT_EQUAL_INT (0, second_half);
rc = zmq_msg_close (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
close_zero_linger (pull);
test_context_socket_close_zero_linger (pull);
for (size_t peer = 0; peer < services; ++peer)
close_zero_linger (pushs[peer]);
// Wait for disconnects.
msleep (SETTLE_TIME);
test_context_socket_close_zero_linger (pushs[peer]);
}
void test_push_block_on_send_no_peers (void *ctx_)
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
void test_push_block_on_send_no_peers (const char *bind_address_)
{
void *sc = zmq_socket (ctx_, ZMQ_PUSH);
assert (sc);
void *sc = test_context_socket (ZMQ_PUSH);
int timeout = 250;
int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
assert (rc == 0);
rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
rc = zmq_send (sc, 0, 0, 0);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
rc = zmq_close (sc);
assert (rc == 0);
test_context_socket_close (sc);
}
void test_destroy_queue_on_disconnect (void *ctx_)
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
void test_destroy_queue_on_disconnect (const char *bind_address_)
{
void *a = zmq_socket (ctx_, ZMQ_PUSH);
assert (a);
void *a = test_context_socket (ZMQ_PUSH);
int hwm = 1;
int rc = zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
rc = zmq_bind (a, bind_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len));
void *b = zmq_socket (ctx_, ZMQ_PULL);
assert (b);
void *b = test_context_socket (ZMQ_PULL);
rc = zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
rc = zmq_connect (b, connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
// Send two messages, one should be stuck in A's outgoing queue, the other
// arrives at B.
......@@ -211,93 +202,79 @@ void test_destroy_queue_on_disconnect (void *ctx_)
s_send_seq (a, "DEF", SEQ_END);
// Both queues should now be full, indicated by A blocking on send.
rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
rc = zmq_disconnect (b, connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));
// Disconnect may take time and need command processing.
zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
rc = zmq_poll (poller, 2, 100);
assert (rc == 0);
TEST_ASSERT_EQUAL_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
TEST_ASSERT_EQUAL_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
// Can't receive old data on B.
rc = zmq_msg_recv (&msg, b, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
// Sending fails.
rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
// Reconnect B
rc = zmq_connect (b, connect_address);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
// Still can't receive old data on B.
rc = zmq_msg_recv (&msg, b, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
// two messages should be sendable before the queues are filled up.
s_send_seq (a, "ABC", SEQ_END);
s_send_seq (a, "DEF", SEQ_END);
rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT);
assert (rc == -1);
assert (errno == EAGAIN);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
rc = zmq_msg_close (&msg);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
close_zero_linger (a);
close_zero_linger (b);
// Wait for disconnects.
msleep (SETTLE_TIME);
test_context_socket_close_zero_linger (a);
test_context_socket_close_zero_linger (b);
}
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds[] = {"inproc://a", "tcp://127.0.0.1:*"};
for (int transport = 0; transport < 2; ++transport) {
bind_address = binds[transport];
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
test_push_round_robin_out (ctx);
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_pull_fair_queue_in (ctx);
#define def_test_spec_pushpull(name, bind_address_) \
void test_spec_pushpull_##name##_push_round_robin_out () \
{ \
test_push_round_robin_out (bind_address_); \
} \
void test_spec_pushpull_##name##_pull_fair_queue_in () \
{ \
test_pull_fair_queue_in (bind_address_); \
} \
void test_spec_pushpull_##name##_push_block_on_send_no_peers () \
{ \
test_push_block_on_send_no_peers (bind_address_); \
} \
void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
{ \
test_destroy_queue_on_disconnect (bind_address_); \
}
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
test_push_block_on_send_no_peers (ctx);
def_test_spec_pushpull (inproc, "inproc://a")
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
// *** Test disabled until libzmq does this properly ***
// test_destroy_queue_on_disconnect (ctx);
}
def_test_spec_pushpull (tcp, "tcp://127.0.0.1:*")
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_spec_pushpull_inproc_push_round_robin_out);
RUN_TEST (test_spec_pushpull_tcp_push_round_robin_out);
RUN_TEST (test_spec_pushpull_inproc_pull_fair_queue_in);
RUN_TEST (test_spec_pushpull_tcp_pull_fair_queue_in);
RUN_TEST (test_spec_pushpull_inproc_push_block_on_send_no_peers);
RUN_TEST (test_spec_pushpull_tcp_push_block_on_send_no_peers);
// TODO Tests disabled until libzmq does this properly
//RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
//RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
return UNITY_END ();
}
......@@ -28,10 +28,21 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
// ZMTP protocol greeting structure
typedef unsigned char byte;
typedef uint8_t byte;
typedef struct
{
byte signature[10]; // 0xFF 8*0x00 0x7F
......@@ -49,113 +60,97 @@ typedef struct
static zmtp_greeting_t greeting = {
{0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}};
static void test_stream_to_dealer (void)
static void test_stream_to_dealer ()
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
// We'll be using this socket in raw mode
void *stream = zmq_socket (ctx, ZMQ_STREAM);
assert (stream);
void *stream = test_context_socket (ZMQ_STREAM);
int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
int enabled = 1;
rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint);
// We'll be using this socket as the other peer
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_connect (dealer, my_endpoint);
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
// Send a message on the dealer socket
rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5);
send_string_expect_success (dealer, "Hello", 0);
// Connecting sends a zero message
// First frame is routing id
zmq_msg_t routing_id;
rc = zmq_msg_init (&routing_id);
assert (rc == 0);
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&routing_id));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
// Verify the existence of Peer-Address metadata
char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
TEST_ASSERT_NOT_NULL (peer_address);
TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
// Second frame is zero
byte buffer[255];
rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 0);
TEST_ASSERT_EQUAL_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 255, 0)));
// Verify the existence of Peer-Address metadata
peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
TEST_ASSERT_NOT_NULL (peer_address);
TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
// Real data follows
// First frame is routing id
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&routing_id));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
// Verify the existence of Peer-Address metadata
peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
TEST_ASSERT_NOT_NULL (peer_address);
TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
// Second frame is greeting signature
rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 10);
assert (memcmp (buffer, greeting.signature, 10) == 0);
recv_array_expect_success (stream, greeting.signature, 0);
// Send our own protocol greeting
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (stream, &greeting, sizeof (greeting), 0);
assert (rc == sizeof (greeting));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE));
TEST_ASSERT_EQUAL_INT (
sizeof (greeting), TEST_ASSERT_SUCCESS_ERRNO (
zmq_send (stream, &greeting, sizeof (greeting), 0)));
// Now we expect the data from the DEALER socket
// We want the rest of greeting along with the Ready command
int bytes_read = 0;
while (bytes_read < 97) {
// First frame is the routing id of the connection (each time)
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&routing_id));
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
// Second frame contains the next chunk of data
rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (
rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0));
bytes_read += rc;
}
// First two bytes are major and minor version numbers.
assert (buffer[0] == 3); // ZMTP/3.0
assert (buffer[1] == 0);
TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0
TEST_ASSERT_EQUAL_INT (0, buffer[1]);
// Mechanism is "NULL"
assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
== 0);
assert (memcmp (buffer + 54, "\4\51\5READY", 8) == 0);
assert (memcmp (buffer + 62, "\13Socket-Type\0\0\0\6DEALER", 22) == 0);
assert (memcmp (buffer + 84, "\10Identity\0\0\0\0", 13) == 0);
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2,
"NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20);
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY", 8);
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER",
22);
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0", 13);
// Announce we are ready
memcpy (buffer, "\4\51\5READY", 8);
......@@ -163,37 +158,36 @@ static void test_stream_to_dealer (void)
memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
// Send Ready command
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (stream, buffer, 43, 0);
assert (rc == 43);
TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
&routing_id, stream, ZMQ_SNDMORE)));
TEST_ASSERT_EQUAL_INT (
43, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, buffer, 43, 0)));
// Now we expect the data from the DEALER socket
// First frame is, again, the routing id of the connection
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&routing_id));
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
// Third frame contains Hello message from DEALER
rc = zmq_recv (stream, buffer, sizeof buffer, 0);
assert (rc == 7);
TEST_ASSERT_EQUAL_INT (7, TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (stream, buffer, sizeof buffer, 0)));
// Then we have a 5-byte message "Hello"
assert (buffer[0] == 0); // Flags = 0
assert (buffer[1] == 5); // Size = 5
assert (memcmp (buffer + 2, "Hello", 5) == 0);
TEST_ASSERT_EQUAL_INT (0, buffer[0]); // Flags = 0
TEST_ASSERT_EQUAL_INT (5, buffer[1]); // Size = 5
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello", 5);
// Send "World" back to DEALER
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
&routing_id, stream, ZMQ_SNDMORE)));
byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'};
rc = zmq_send (stream, world, sizeof (world), 0);
assert (rc == sizeof (world));
TEST_ASSERT_EQUAL_INT (
sizeof (world),
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0)));
// Expect response on DEALER socket
rc = zmq_recv (dealer, buffer, 255, 0);
assert (rc == 5);
assert (memcmp (buffer, "World", 5) == 0);
recv_string_expect_success (dealer, "World", 0);
// Test large messages over STREAM socket
#define size 64000
......@@ -206,110 +200,92 @@ static void test_stream_to_dealer (void)
bytes_read = 0;
while (bytes_read < 9 + size) {
// Get routing id frame
rc = zmq_recv (stream, buffer, 256, 0);
assert (rc > 0);
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 256, 0)));
// Get next chunk
rc = zmq_recv (stream, msgin + bytes_read, 9 + size - bytes_read, 0);
assert (rc > 0);
TEST_ASSERT_GREATER_THAN_INT (
0,
TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read,
9 + size - bytes_read, 0)));
bytes_read += rc;
}
int byte_nbr;
for (byte_nbr = 0; byte_nbr < size; byte_nbr++) {
if (msgin[9 + byte_nbr] != 0xAB)
assert (false);
for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) {
TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]);
}
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_close (stream);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (dealer);
test_context_socket_close (stream);
}
static void test_stream_to_stream (void)
static void test_stream_to_stream ()
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set-up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server);
void *server = test_context_socket (ZMQ_STREAM);
int enabled = 1;
rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_STREAM);
assert (client);
rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);
void *client = test_context_socket (ZMQ_STREAM);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
uint8_t id[256];
size_t id_size = 256;
uint8_t buffer[256];
// Connecting sends a zero message
// Server: First frame is routing id, second frame is zero
id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0);
assert (rc == 0);
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
TEST_ASSERT_EQUAL_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0)));
// Client: First frame is routing id, second frame is zero
id_size = zmq_recv (client, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (client, buffer, 256, 0);
assert (rc == 0);
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
TEST_ASSERT_EQUAL_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
// Sent HTTP request on client socket
// Get server routing id
rc = zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size);
assert (rc == 0);
size_t id_size = sizeof id;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size));
// First frame is server routing id
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
assert (rc == (int) id_size);
TEST_ASSERT_EQUAL_INT ((int) id_size, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (
client, id, id_size, ZMQ_SNDMORE)));
// Second frame is HTTP GET request
rc = zmq_send (client, "GET /\n\n", 7, 0);
assert (rc == 7);
TEST_ASSERT_EQUAL_INT (
7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n", 7, 0)));
// Get HTTP request; ID frame and then request
id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0);
assert (rc != -1);
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0));
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n", 7);
// Send reply back to client
char http_response[] = "HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE));
// Send zero to close connection to client
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_send (server, NULL, 0, ZMQ_SNDMORE);
assert (rc != -1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, NULL, 0, ZMQ_SNDMORE));
// Get reply at client and check that it's complete
id_size = zmq_recv (client, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (client, buffer, 256, 0);
assert (rc == sizeof (http_response));
assert (memcmp (buffer, http_response, sizeof (http_response)) == 0);
TEST_ASSERT_GREATER_THAN_INT (
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
TEST_ASSERT_EQUAL_INT (
sizeof http_response,
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response,
sizeof (http_response));
// // Get disconnection notification
// FIXME: why does this block? Bug in STREAM disconnect notification?
......@@ -318,19 +294,16 @@ static void test_stream_to_stream (void)
// rc = zmq_recv (client, buffer, 256, 0);
// assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
test_context_socket_close (server);
test_context_socket_close (client);
}
int main (void)
int main ()
{
setup_test_environment ();
test_stream_to_dealer ();
test_stream_to_stream ();
UNITY_BEGIN ();
RUN_TEST (test_stream_to_dealer);
RUN_TEST (test_stream_to_stream);
return UNITY_END ();
}
......@@ -28,48 +28,52 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_stream_empty ()
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *stream = zmq_socket (ctx, ZMQ_STREAM);
assert (stream);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc >= 0);
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (dealer, my_endpoint);
assert (rc >= 0);
zmq_send (dealer, "", 0, 0);
void *stream = test_context_socket (ZMQ_STREAM);
void *dealer = test_context_socket (ZMQ_DEALER);
bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
send_string_expect_success (dealer, "", 0);
zmq_msg_t ident, empty;
zmq_msg_init (&ident);
rc = zmq_msg_recv (&ident, stream, 0);
assert (rc >= 0);
rc = zmq_msg_init_data (&empty, (void *) "", 0, NULL, NULL);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&ident, stream, 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_init_data (&empty, (void *) "", 0, NULL, NULL));
rc = zmq_msg_send (&ident, stream, ZMQ_SNDMORE);
assert (rc >= 0);
rc = zmq_msg_close (&ident);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&ident, stream, ZMQ_SNDMORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&ident));
rc = zmq_msg_send (&empty, stream, 0);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&empty, stream, 0));
// This close used to fail with Bad Address
rc = zmq_msg_close (&empty);
assert (rc >= 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&empty));
test_context_socket_close_zero_linger (dealer);
test_context_socket_close_zero_linger (stream);
}
int main ()
{
setup_test_environment ();
close_zero_linger (dealer);
close_zero_linger (stream);
zmq_ctx_term (ctx);
UNITY_BEGIN ();
RUN_TEST (test_stream_empty);
return UNITY_END ();
}
......@@ -18,28 +18,31 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int main (void)
void setUp ()
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
int ipv6 = is_ipv6_available ();
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_address_wildcard_ipv4 ()
{
/* Address wildcard, IPv6 disabled */
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
void *sb = test_context_socket (ZMQ_REP);
void *sc = test_context_socket (ZMQ_REQ);
int rc = zmq_bind (sb, "tcp://*:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://*:*"));
char bind_endpoint[256];
char connect_endpoint[256];
size_t endpoint_len = sizeof (bind_endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bind_endpoint, &endpoint_len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bind_endpoint, &endpoint_len));
// Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
......@@ -49,39 +52,38 @@ int main (void)
strcpy (connect_endpoint, bind_endpoint);
#endif
rc = zmq_connect (sc, connect_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, connect_endpoint));
bounce (sb, sc);
rc = zmq_disconnect (sc, connect_endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, bind_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sc, connect_endpoint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (sb, bind_endpoint));
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_address_wildcard_ipv6 ()
{
int ipv6 = is_ipv6_available ();
/* Address wildcard, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
void *sb = test_context_socket (ZMQ_REP);
void *sc = test_context_socket (ZMQ_REQ);
rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int)));
rc = zmq_bind (sb, "tcp://*:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://*:*"));
endpoint_len = sizeof (bind_endpoint);
char bind_endpoint[256];
char connect_endpoint[256];
size_t endpoint_len = sizeof (bind_endpoint);
memset (bind_endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bind_endpoint, &endpoint_len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bind_endpoint, &endpoint_len));
#ifdef ZMQ_HAVE_WINDOWS
if (ipv6)
......@@ -94,123 +96,116 @@ int main (void)
strcpy (connect_endpoint, bind_endpoint);
#endif
rc = zmq_connect (sc, connect_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, connect_endpoint));
bounce (sb, sc);
rc = zmq_disconnect (sc, connect_endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, bind_endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sc, connect_endpoint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (sb, bind_endpoint));
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_port_wildcard_ipv4_address ()
{
/* Port wildcard, IPv4 address, IPv6 disabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
void *sb = test_context_socket (ZMQ_REP);
void *sc = test_context_socket (ZMQ_REQ);
rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*"));
char endpoint[256];
endpoint_len = sizeof (endpoint);
size_t endpoint_len = sizeof (endpoint);
memset (endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len));
rc = zmq_connect (sc, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint));
bounce (sb, sc);
rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sc, endpoint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (sb, endpoint));
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_port_wildcard_ipv4_address_ipv6 ()
{
/* Port wildcard, IPv4 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
void *sb = test_context_socket (ZMQ_REP);
void *sc = test_context_socket (ZMQ_REQ);
rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
const int ipv6 = is_ipv6_available ();
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int)));
rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*"));
endpoint_len = sizeof (endpoint);
char endpoint[256];
size_t endpoint_len = sizeof (endpoint);
memset (endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len));
rc = zmq_connect (sc, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint));
bounce (sb, sc);
rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sc, endpoint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (sb, endpoint));
test_context_socket_close (sc);
test_context_socket_close (sb);
}
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
void test_port_wildcard_ipv6_address ()
{
const int ipv6 = is_ipv6_available ();
if (!ipv6)
TEST_IGNORE_MESSAGE ("ipv6 is not available");
if (ipv6) {
/* Port wildcard, IPv6 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
/* Port wildcard, IPv6 address, IPv6 enabled */
void *sb = test_context_socket (ZMQ_REP);
void *sc = test_context_socket (ZMQ_REQ);
rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int)));
rc = zmq_bind (sb, "tcp://[::1]:*");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://[::1]:*"));
endpoint_len = sizeof (endpoint);
memset (endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);
char endpoint[256];
size_t endpoint_len = sizeof (endpoint);
memset (endpoint, 0, endpoint_len);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len));
rc = zmq_connect (sc, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint));
bounce (sb, sc);
bounce (sb, sc);
rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (sc, endpoint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (sb, endpoint));
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
}
test_context_socket_close (sc);
test_context_socket_close (sb);
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);
int main ()
{
setup_test_environment ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_address_wildcard_ipv4);
RUN_TEST (test_address_wildcard_ipv6);
RUN_TEST (test_port_wildcard_ipv4_address);
RUN_TEST (test_port_wildcard_ipv4_address_ipv6);
RUN_TEST (test_port_wildcard_ipv6_address);
return UNITY_END ();
}
......@@ -28,570 +28,430 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int test_basic ()
void setUp ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
void test_basic ()
{
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
void *pub = test_context_socket (ZMQ_XPUB);
int manual = 1;
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0);
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
// Subscribe for A
char subscription[2] = {1, 'A'};
rc = zmq_send_const (sub, subscription, 2, 0);
assert (rc == 2);
char buffer[2];
const char subscription[] = {1, 'A', 0};
send_string_expect_success (sub, subscription, 0);
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_string_expect_success (pub, subscription, 0);
// Subscribe socket for B instead
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
// Sending A message and B Message
rc = zmq_send_const (pub, "A", 1, 0);
assert (rc == 1);
rc = zmq_send_const (pub, "B", 1, 0);
assert (rc == 1);
send_string_expect_success (pub, "A", 0);
send_string_expect_success (pub, "B", 0);
rc = zmq_recv (sub, buffer, 1, ZMQ_DONTWAIT);
assert (rc == 1);
assert (buffer[0] == 'B');
recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int test_unsubscribe_manual ()
void test_unsubscribe_manual ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
// set pub socket options
int manual = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, sizeof (manual)));
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0);
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
// Subscribe for A
char subscription1[2] = {1, 'A'};
rc = zmq_send_const (sub, subscription1, 2, 0);
assert (rc == 2);
const uint8_t subscription1[] = {1, 'A'};
send_array_expect_success (sub, subscription1, 0);
// Subscribe for B
char subscription2[2] = {1, 'B'};
rc = zmq_send_const (sub, subscription2, 2, 0);
assert (rc == 2);
const uint8_t subscription2[] = {1, 'B'};
send_array_expect_success (sub, subscription2, 0);
char buffer[3];
// Receive subscription "A" from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscription1, 0);
// Subscribe socket for XA instead
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
// Receive subscription "B" from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'B');
recv_array_expect_success (pub, subscription2, 0);
// Subscribe socket for XB instead
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
// Unsubscribe from A
char unsubscription1[2] = {0, 'A'};
rc = zmq_send_const (sub, unsubscription1, 2, 0);
assert (rc == 2);
const uint8_t unsubscription1[2] = {0, 'A'};
send_array_expect_success (sub, unsubscription1, 0);
// Receive unsubscription "A" from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscription1, 0);
// Unsubscribe socket from XA instead
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
// Sending messages XA, XB
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);
send_string_expect_success (pub, "XA", 0);
send_string_expect_success (pub, "XB", 0);
// Subscriber should receive XB only
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'B');
recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
// Close subscriber
rc = zmq_close (sub);
assert (rc == 0);
test_context_socket_close (sub);
// Receive unsubscription "B"
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'B');
const char unsubscription2[2] = {0, 'B'};
TEST_ASSERT_EQUAL_INT (
sizeof unsubscription2,
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
sizeof unsubscription2);
// Unsubscribe socket from XB instead
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (pub);
}
int test_xpub_proxy_unsubscribe_on_disconnect (void)
void test_xpub_proxy_unsubscribe_on_disconnect ()
{
const char *topic = "1";
const char *payload = "X";
const uint8_t topic_buff[] = {"1"};
const uint8_t payload_buff[] = {"X"};
size_t len = MAX_SOCKET_STRING;
char my_endpoint_backend[MAX_SOCKET_STRING];
char my_endpoint_frontend[MAX_SOCKET_STRING];
int manual = 1;
void *ctx = zmq_ctx_new ();
assert (ctx);
// proxy frontend
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub_proxy);
assert (zmq_bind (xsub_proxy, "tcp://127.0.0.1:*") == 0);
int rc = zmq_getsockopt (xsub_proxy, ZMQ_LAST_ENDPOINT,
my_endpoint_frontend, &len);
assert (rc == 0);
void *xsub_proxy = test_context_socket (ZMQ_XSUB);
bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
sizeof my_endpoint_frontend);
// proxy backend
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub_proxy);
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
assert (zmq_bind (xpub_proxy, "tcp://127.0.0.1:*") == 0);
len = MAX_SOCKET_STRING;
rc =
zmq_getsockopt (xpub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_backend, &len);
assert (rc == 0);
void *xpub_proxy = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
sizeof my_endpoint_backend);
// publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (zmq_connect (pub, my_endpoint_frontend) == 0);
void *pub = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
// first subscriber subscribes
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
assert (sub1);
assert (zmq_connect (sub1, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
void *sub1 = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
// wait
msleep (SETTLE_TIME);
// proxy reroutes and confirms subscriptions
char sub_buff[2];
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
assert (sub_buff[0] == 1);
assert (sub_buff[1] == *topic);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
const uint8_t subscription[2] = {1, *topic_buff};
recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
send_array_expect_success (xsub_proxy, subscription, 0);
// second subscriber subscribes
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
assert (sub2);
assert (zmq_connect (sub2, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
void *sub2 = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
// wait
msleep (SETTLE_TIME);
// proxy reroutes
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
assert (sub_buff[0] == 1);
assert (sub_buff[1] == *topic);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
send_array_expect_success (xsub_proxy, subscription, 0);
// wait
msleep (SETTLE_TIME);
// let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
send_array_expect_success (pub, payload_buff, 0);
// wait
msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers
char topic_buff[1];
char data_buff[1];
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic);
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
send_array_expect_success (xpub_proxy, payload_buff, 0);
// wait
msleep (SETTLE_TIME);
// each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic);
assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic);
assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
// Disconnect both subscribers
assert (zmq_close (sub1) == 0);
assert (zmq_close (sub2) == 0);
test_context_socket_close (sub1);
test_context_socket_close (sub2);
// wait
msleep (SETTLE_TIME);
// unsubscribe messages are passed from proxy to publisher
assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2);
assert (sub_buff[0] == 0);
assert (sub_buff[1] == *topic);
assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
const uint8_t unsubscription[] = {0, *topic_buff};
recv_array_expect_success (xpub_proxy, unsubscription, 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
send_array_expect_success (xsub_proxy, unsubscription, 0);
// should receive another unsubscribe msg
assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2
&& "Should receive the second unsubscribe message.");
assert (sub_buff[0] == 0);
assert (sub_buff[1] == *topic);
assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
recv_array_expect_success (xpub_proxy, unsubscription, 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
send_array_expect_success (xsub_proxy, unsubscription, 0);
// wait
msleep (SETTLE_TIME);
// let publisher send a msg
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
send_array_expect_success (pub, payload_buff, 0);
// wait
msleep (SETTLE_TIME);
// nothing should come to the proxy
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1);
assert (errno == EAGAIN);
assert (zmq_close (pub) == 0);
assert (zmq_close (xpub_proxy) == 0);
assert (zmq_close (xsub_proxy) == 0);
assert (zmq_ctx_term (ctx) == 0);
char buffer[1];
TEST_ASSERT_FAILURE_ERRNO (
EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
return 0;
test_context_socket_close (pub);
test_context_socket_close (xpub_proxy);
test_context_socket_close (xsub_proxy);
}
int test_missing_subscriptions (void)
void test_missing_subscriptions ()
{
const char *topic1 = "1";
const char *topic2 = "2";
const char *payload = "X";
size_t len = MAX_SOCKET_STRING;
char my_endpoint_backend[MAX_SOCKET_STRING];
char my_endpoint_frontend[MAX_SOCKET_STRING];
int manual = 1;
void *ctx = zmq_ctx_new ();
assert (ctx);
// proxy frontend
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub_proxy);
assert (zmq_bind (xsub_proxy, "tcp://127.0.0.1:*") == 0);
int rc = zmq_getsockopt (xsub_proxy, ZMQ_LAST_ENDPOINT,
my_endpoint_frontend, &len);
assert (rc == 0);
void *xsub_proxy = test_context_socket (ZMQ_XSUB);
bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
sizeof my_endpoint_frontend);
// proxy backend
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub_proxy);
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
assert (zmq_bind (xpub_proxy, "tcp://127.0.0.1:*") == 0);
len = MAX_SOCKET_STRING;
rc =
zmq_getsockopt (xpub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_backend, &len);
assert (rc == 0);
void *xpub_proxy = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
sizeof my_endpoint_backend);
// publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (zmq_connect (pub, my_endpoint_frontend) == 0);
void *pub = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
// Here's the problem: because subscribers subscribe in quick succession,
// the proxy is unable to confirm the first subscription before receiving
// the second. This causes the first subscription to get lost.
// first subscriber
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
assert (sub1);
assert (zmq_connect (sub1, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0);
void *sub1 = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
// second subscriber
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
assert (sub2);
assert (zmq_connect (sub2, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
void *sub2 = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
// wait
msleep (SETTLE_TIME);
// proxy now reroutes and confirms subscriptions
char buffer[2];
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
assert (buffer[0] == 1);
assert (buffer[1] == *topic1);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1) == 0);
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
assert (buffer[0] == 1);
assert (buffer[1] == *topic2);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1) == 0);
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
send_array_expect_success (xsub_proxy, subscription1, 0);
const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
send_array_expect_success (xsub_proxy, subscription2, 0);
// wait
msleep (SETTLE_TIME);
// let publisher send 2 msgs, each with its own topic
assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
assert (zmq_send (pub, topic2, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
// let publisher send 2 msgs, each with its own topic_buff
send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
send_string_expect_success (pub, payload, 0);
send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
send_string_expect_success (pub, payload, 0);
// wait
msleep (SETTLE_TIME);
// proxy reroutes data messages to subscribers
char topic_buff[1];
char data_buff[1];
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic1);
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic2);
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
send_string_expect_success (xpub_proxy, payload, 0);
recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
send_string_expect_success (xpub_proxy, payload, 0);
// wait
msleep (SETTLE_TIME);
// each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic2);
assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff[0] == *topic1);
assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff[0] == *payload);
recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
// Clean up
assert (zmq_close (sub1) == 0);
assert (zmq_close (sub2) == 0);
assert (zmq_close (pub) == 0);
assert (zmq_close (xpub_proxy) == 0);
assert (zmq_close (xsub_proxy) == 0);
assert (zmq_ctx_term (ctx) == 0);
return 0;
test_context_socket_close (sub1);
test_context_socket_close (sub2);
test_context_socket_close (pub);
test_context_socket_close (xpub_proxy);
test_context_socket_close (xsub_proxy);
}
int test_unsubscribe_cleanup (void)
void test_unsubscribe_cleanup ()
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
void *pub = test_context_socket (ZMQ_XPUB);
int manual = 1;
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
// Subscribe for A
char subscription[2] = {1, 'A'};
rc = zmq_send_const (sub, subscription, 2, 0);
assert (rc == 2);
const uint8_t subscription1[2] = {1, 'A'};
send_array_expect_success (sub, subscription1, 0);
char buffer[2];
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
assert (rc == 0);
recv_array_expect_success (pub, subscription1, 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);
send_string_expect_success (pub, "XA", 0);
send_string_expect_success (pub, "XB", 0);
// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'A');
recv_string_expect_success (sub, "XA", 0);
// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);
char buffer[2];
TEST_ASSERT_FAILURE_ERRNO (
EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
// close the socket
rc = zmq_close (sub);
assert (rc == 0);
test_context_socket_close (sub);
// closing the socket will result in an unsubscribe event
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
const uint8_t unsubscription[2] = {0, 'A'};
recv_array_expect_success (pub, unsubscription, 0);
// this doesn't really do anything
// there is no last_pipe set it will just fail silently
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
// reconnect
sub = zmq_socket (ctx, ZMQ_XSUB);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);
sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
// send a subscription for B
subscription[0] = 1;
subscription[1] = 'B';
rc = zmq_send (sub, subscription, 2, 0);
assert (rc == 2);
const uint8_t subscription2[2] = {1, 'B'};
send_array_expect_success (sub, subscription2, 0);
// receive the subscription, overwrite it to XB
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'B');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
assert (rc == 0);
recv_array_expect_success (pub, subscription2, 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);
send_string_expect_success (pub, "XA", 0);
send_string_expect_success (pub, "XB", 0);
// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'B'); // this assertion will fail
recv_string_expect_success (sub, "XB", 0);
// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);
TEST_ASSERT_FAILURE_ERRNO (
EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main (void)
int main ()
{
setup_test_environment ();
test_basic ();
test_unsubscribe_manual ();
test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions ();
test_unsubscribe_cleanup ();
return 0;
UNITY_BEGIN ();
RUN_TEST (test_basic);
RUN_TEST (test_unsubscribe_manual);
RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
RUN_TEST (test_missing_subscriptions);
RUN_TEST (test_unsubscribe_cleanup);
return UNITY_END ();
}
......@@ -28,464 +28,303 @@
*/
#include "testutil.hpp"
#include <unity.h>
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
const uint8_t unsubscribe_a_msg[] = {0, 'A'};
const uint8_t subscribe_a_msg[] = {1, 'A'};
const uint8_t subscribe_b_msg[] = {1, 'B'};
const char test_endpoint[] = "inproc://soname";
const char topic_a[] = "A";
const char topic_b[] = "B";
void test_xpub_verbose_one_sub ()
{
int rc;
char buffer[2];
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
void *pub = zmq_socket (ctx, ZMQ_XPUB);
TEST_ASSERT_NOT_NULL (pub);
rc = zmq_bind (pub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub);
rc = zmq_connect (sub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
// Subscribe for A
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Subscribe socket for B instead
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "B", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_b, 1));
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'B');
recv_array_expect_success (pub, subscribe_b_msg, 0);
// Subscribe again for A again
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// This time it is duplicated, so it will be filtered out
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
int verbose = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int));
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
// Subscribe socket for A again
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// This time with VERBOSE the duplicated sub will be received
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Sending A message and B Message
rc = zmq_send_const (pub, "A", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
send_string_expect_success (pub, topic_a, 0);
send_string_expect_success (pub, topic_b, 0);
recv_string_expect_success (sub, topic_a, 0);
recv_string_expect_success (sub, topic_b, 0);
rc = zmq_send_const (pub, "B", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
// Clean up.
test_context_socket_close (pub);
test_context_socket_close (sub);
}
rc = zmq_recv (sub, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
void create_xpub_with_2_subs (void **pub_, void **sub0_, void **sub1_)
{
*pub_ = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (*pub_, test_endpoint));
rc = zmq_recv (sub, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'B');
*sub0_ = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub0_, test_endpoint));
// Clean up.
rc = zmq_close (pub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_ctx_term (ctx);
TEST_ASSERT_EQUAL_INT (0, rc);
*sub1_ = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub1_, test_endpoint));
}
void test_xpub_verbose_two_subs ()
void create_duplicate_subscription (void *pub_, void *sub0_, void *sub1_)
{
int rc;
char buffer[2];
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
void *pub = zmq_socket (ctx, ZMQ_XPUB);
TEST_ASSERT_NOT_NULL (pub);
rc = zmq_bind (pub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub0 = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub0);
rc = zmq_connect (sub0, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub1);
rc = zmq_connect (sub1, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
// Subscribe for A on the first socket
rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// Subscribe for A
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub0_, ZMQ_SUBSCRIBE, topic_a, 1));
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub_, subscribe_a_msg, 0);
// Subscribe for A on the second socket
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// Subscribe again for A on the other socket
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1_, ZMQ_SUBSCRIBE, topic_a, 1));
// This time it is duplicated, so it will be filtered out
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
// This time it is duplicated, so it will be filtered out by XPUB
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub_, NULL, 0, ZMQ_DONTWAIT));
}
void test_xpub_verbose_two_subs ()
{
void *pub, *sub0, *sub1;
create_xpub_with_2_subs (&pub, &sub0, &sub1);
create_duplicate_subscription (pub, sub0, sub1);
// Subscribe socket for B instead
rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "B", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_b, 1));
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'B');
recv_array_expect_success (pub, subscribe_b_msg, 0);
int verbose = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int));
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
// Subscribe socket for A again
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
// This time with VERBOSE the duplicated sub will be received
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Sending A message and B Message
rc = zmq_send_const (pub, "A", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_send_const (pub, "B", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
send_string_expect_success (pub, topic_a, 0);
rc = zmq_recv (sub0, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
send_string_expect_success (pub, topic_b, 0);
rc = zmq_recv (sub1, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
rc = zmq_recv (sub0, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'B');
recv_string_expect_success (sub0, topic_a, 0);
recv_string_expect_success (sub1, topic_a, 0);
recv_string_expect_success (sub0, topic_b, 0);
// Clean up.
rc = zmq_close (pub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub0);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub1);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_ctx_term (ctx);
TEST_ASSERT_EQUAL_INT (0, rc);
test_context_socket_close (pub);
test_context_socket_close (sub0);
test_context_socket_close (sub1);
}
void test_xpub_verboser_one_sub ()
{
int rc;
char buffer[3];
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
TEST_ASSERT_NOT_NULL (pub);
rc = zmq_bind (pub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub);
rc = zmq_connect (sub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
// Unsubscribe for A, does not exist yet
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Does not exist, so it will be filtered out by XSUB
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Subscribe for A
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Subscribe again for A again, XSUB will increase refcount
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// This time it is duplicated, so it will be filtered out by XPUB
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Unsubscribe for A, this time it exists in XPUB
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// XSUB refcounts and will not actually send unsub to PUB until the number
// of unsubs match the earlier subs
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Receive unsubscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscribe_a_msg, 0);
// XSUB only sends the last and final unsub, so XPUB will only receive 1
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Unsubscribe for A, does not exist anymore
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Does not exist, so it will be filtered out by XSUB
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
int verbose = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int));
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
// Subscribe socket for A again
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
// Receive subscriptions from subscriber, did not exist anymore
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Sending A message to make sure everything still works
rc = zmq_send_const (pub, "A", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
send_string_expect_success (pub, topic_a, 0);
rc = zmq_recv (sub, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
recv_string_expect_success (sub, topic_a, 0);
// Unsubscribe for A, this time it exists
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Receive unsubscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscribe_a_msg, 0);
// Unsubscribe for A again, it does not exist anymore so XSUB will filter
rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
// XSUB only sends unsub if it matched it in its trie, IOW: it will only
// send it if it existed in the first place even with XPUB_VERBBOSER
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Clean up.
rc = zmq_close (pub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_ctx_term (ctx);
TEST_ASSERT_EQUAL_INT (0, rc);
test_context_socket_close (pub);
test_context_socket_close (sub);
}
void test_xpub_verboser_two_subs ()
{
int rc;
char buffer[3];
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
void *pub = zmq_socket (ctx, ZMQ_XPUB);
TEST_ASSERT_NOT_NULL (pub);
rc = zmq_bind (pub, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub0 = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub0);
rc = zmq_connect (sub0, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (sub1);
rc = zmq_connect (sub1, "inproc://soname");
TEST_ASSERT_EQUAL_INT (0, rc);
// Subscribe for A
rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// Receive subscriptions from subscriber
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
// Subscribe again for A on the other socket
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// This time it is duplicated, so it will be filtered out by XPUB
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
void *pub, *sub0, *sub1;
create_xpub_with_2_subs (&pub, &sub0, &sub1);
create_duplicate_subscription (pub, sub0, sub1);
// Unsubscribe for A, this time it exists in XPUB
rc = zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
// sub1 is still subscribed, so no notification
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Unsubscribe the second socket to trigger the notification
rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Receive unsubscriptions since all sockets are gone
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscribe_a_msg, 0);
// Make really sure there is only one notification
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
int verbose = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int));
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
// Subscribe socket for A again
rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_a, 1));
// Subscribe socket for A again
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
// Receive subscriptions from subscriber, did not exist anymore
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// VERBOSER is set, so subs from both sockets are received
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, subscribe_a_msg, 0);
// Sending A message to make sure everything still works
rc = zmq_send_const (pub, "A", 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_recv (sub0, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
send_string_expect_success (pub, topic_a, 0);
rc = zmq_recv (sub1, buffer, 1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
assert (buffer[0] == 'A');
recv_string_expect_success (sub0, topic_a, 0);
recv_string_expect_success (sub1, topic_a, 0);
// Unsubscribe for A
rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Receive unsubscriptions from first subscriber due to VERBOSER
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscribe_a_msg, 0);
// Unsubscribe for A again from the other socket
rc = zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Receive unsubscriptions from first subscriber due to VERBOSER
rc = zmq_recv (pub, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
recv_array_expect_success (pub, unsubscribe_a_msg, 0);
// Unsubscribe again to make sure it gets filtered now
rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
// Unmatched, so XSUB filters even with VERBOSER
rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
// Clean up.
rc = zmq_close (pub);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub0);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (sub1);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_ctx_term (ctx);
TEST_ASSERT_EQUAL_INT (0, rc);
test_context_socket_close (pub);
test_context_socket_close (sub0);
test_context_socket_close (sub1);
}
int main (void)
int main ()
{
setup_test_environment ();
......@@ -495,5 +334,5 @@ int main (void)
RUN_TEST (test_xpub_verboser_one_sub);
RUN_TEST (test_xpub_verboser_two_subs);
return 0;
return UNITY_END ();
}
......@@ -118,6 +118,32 @@ void recv_string_expect_success (void *socket_, const char *str_, int flags_)
TEST_ASSERT_EQUAL_STRING_LEN (str_, buffer, len);
}
template <size_t SIZE>
void send_array_expect_success (void *socket_,
const uint8_t (&array_)[SIZE],
int flags_)
{
const int rc = zmq_send (socket_, array_, SIZE, flags_);
TEST_ASSERT_EQUAL_INT (static_cast<int> (SIZE), rc);
}
template <size_t SIZE>
void recv_array_expect_success (void *socket_,
const uint8_t (&array_)[SIZE],
int flags_)
{
char buffer[255];
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (sizeof (buffer), SIZE,
"recv_string_expect_success cannot be "
"used for strings longer than 255 "
"characters");
const int rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (socket_, buffer, sizeof (buffer), flags_));
TEST_ASSERT_EQUAL_INT (static_cast<int> (SIZE), rc);
TEST_ASSERT_EQUAL_UINT8_ARRAY (array_, buffer, SIZE);
}
// do not call from tests directly, use setup_test_context, get_test_context and teardown_test_context only
void *internal_manage_test_context (bool init_, bool clear_)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment