Commit 99412c81 authored by Lionel Flandrin's avatar Lionel Flandrin

Problem: ZMQ doesn't expose the MULTICAST_LOOP socket option

Solution: add a new ZMQ_MULTICAST_LOOP option for UDP sockets.
parent e9211aed
...@@ -892,6 +892,18 @@ Default value:: -1 ...@@ -892,6 +892,18 @@ Default value:: -1
Applicable socket types:: all, when using VMCI transport Applicable socket types:: all, when using VMCI transport
ZMQ_MULTICAST_LOOP: Retrieve multicast local loopback configuration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the current multicast loopback configuration. A value of `1`
means that the multicast packets sent on this socket will be looped
back to local listening interface.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -1273,6 +1273,17 @@ Default value:: -1 ...@@ -1273,6 +1273,17 @@ Default value:: -1
Applicable socket types:: all, when using VMCI transport Applicable socket types:: all, when using VMCI transport
ZMQ_MULTICAST_LOOP: Control multicast local loopback
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For multicast UDP sender sockets this option sets whether the data
sent should be looped back on local listening sockets.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -596,6 +596,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); ...@@ -596,6 +596,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
#define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96
/* DRAFT 0MQ socket events and monitoring */ /* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */ /* Unspecified system errors during handshake. Event value is an errno. */
......
...@@ -232,6 +232,7 @@ zmq::options_t::options_t () : ...@@ -232,6 +232,7 @@ zmq::options_t::options_t () :
use_fd (-1), use_fd (-1),
zap_enforce_domain (false), zap_enforce_domain (false),
loopback_fastpath (false), loopback_fastpath (false),
multicast_loop (true),
zero_copy (true) zero_copy (true)
{ {
memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_public_key, 0, CURVE_KEYSIZE);
...@@ -726,6 +727,11 @@ int zmq::options_t::setsockopt (int option_, ...@@ -726,6 +727,11 @@ int zmq::options_t::setsockopt (int option_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
break; break;
case ZMQ_MULTICAST_LOOP:
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&multicast_loop);
default: default:
#if defined(ZMQ_ACT_MILITANT) #if defined(ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option // There are valid scenarios for probing with unknown socket option
...@@ -1120,6 +1126,13 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1120,6 +1126,13 @@ int zmq::options_t::getsockopt (int option_,
} }
break; break;
case ZMQ_MULTICAST_LOOP:
if (is_int) {
*value = multicast_loop;
return 0;
}
break;
default: default:
#if defined(ZMQ_ACT_MILITANT) #if defined(ZMQ_ACT_MILITANT)
malformed = false; malformed = false;
......
...@@ -257,6 +257,9 @@ struct options_t ...@@ -257,6 +257,9 @@ struct options_t
// Use of loopback fastpath. // Use of loopback fastpath.
bool loopback_fastpath; bool loopback_fastpath;
// Loop sent multicast packets to local sockets
bool multicast_loop;
// Use zero copy strategy for storing message content when decoding. // Use zero copy strategy for storing message content when decoding.
bool zero_copy; bool zero_copy;
......
...@@ -119,6 +119,29 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) ...@@ -119,6 +119,29 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
const ip_addr_t *out = address->resolved.udp_addr->target_addr (); const ip_addr_t *out = address->resolved.udp_addr->target_addr ();
out_address = out->as_sockaddr (); out_address = out->as_sockaddr ();
out_addrlen = out->sockaddr_len (); out_addrlen = out->sockaddr_len ();
if (out->is_multicast ()) {
int level;
int optname;
if (out->family () == AF_INET6) {
level = IPPROTO_IPV6;
optname = IPV6_MULTICAST_LOOP;
} else {
level = IPPROTO_IP;
optname = IP_MULTICAST_LOOP;
}
int loop = options.multicast_loop;
int rc = setsockopt (fd, level, optname, (char *) &loop,
sizeof (loop));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
} else { } else {
/// XXX fixme ? /// XXX fixme ?
out_address = (sockaddr *) &raw_address; out_address = (sockaddr *) &raw_address;
......
...@@ -54,6 +54,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); ...@@ -54,6 +54,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96
/* DRAFT 0MQ socket events and monitoring */ /* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */ /* Unspecified system errors during handshake. Event value is an errno. */
......
...@@ -251,6 +251,238 @@ void test_radio_dish_udp (int ipv6_) ...@@ -251,6 +251,238 @@ void test_radio_dish_udp (int ipv6_)
} }
MAKE_TEST_V4V6 (test_radio_dish_udp) MAKE_TEST_V4V6 (test_radio_dish_udp)
#define MCAST_IPV4 "226.8.5.5"
#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
static const char *mcast_url (int ipv6_)
{
if (ipv6_) {
return "udp://[" MCAST_IPV6 "]:5555";
} else {
return "udp://[" MCAST_IPV4 "]:5555";
}
}
// OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif
// Test if multicast is available on this machine by attempting to
// send a receive a multicast datagram
static bool is_multicast_available (int ipv6_)
{
int family = ipv6_ ? AF_INET6 : AF_INET;
int bind_sock = -1;
int send_sock = -1;
int port = 5555;
bool success = false;
const char *msg = "it works";
char buf[32];
struct sockaddr_storage any;
struct sockaddr_storage mcast;
socklen_t sl;
int rc;
if (ipv6_) {
struct sockaddr_in6 *any_ipv6 = (struct sockaddr_in6 *) &any;
struct sockaddr_in6 *mcast_ipv6 = (struct sockaddr_in6 *) &mcast;
any_ipv6->sin6_family = AF_INET6;
any_ipv6->sin6_port = htons (port);
any_ipv6->sin6_flowinfo = 0;
any_ipv6->sin6_scope_id = 0;
rc = inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
if (rc == 0) {
goto out;
}
*mcast_ipv6 = *any_ipv6;
rc = inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
if (rc == 0) {
goto out;
}
sl = sizeof (*any_ipv6);
} else {
struct sockaddr_in *any_ipv4 = (struct sockaddr_in *) &any;
struct sockaddr_in *mcast_ipv4 = (struct sockaddr_in *) &mcast;
any_ipv4->sin_family = AF_INET;
any_ipv4->sin_port = htons (5555);
rc = inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
if (rc == 0) {
goto out;
}
*mcast_ipv4 = *any_ipv4;
rc = inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
if (rc == 0) {
goto out;
}
sl = sizeof (*any_ipv4);
}
bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
if (bind_sock < 0) {
goto out;
}
send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
if (bind_sock < 0) {
goto out;
}
rc = bind (bind_sock, (struct sockaddr *) &any, sl);
if (rc < 0) {
goto out;
}
if (ipv6_) {
struct ipv6_mreq mreq;
struct sockaddr_in6 *mcast_ipv6 = (struct sockaddr_in6 *) &mcast;
mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
mreq.ipv6mr_interface = 0;
rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq,
sizeof (mreq));
if (rc < 0) {
goto out;
}
int loop = 1;
rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop,
sizeof (loop));
if (rc < 0) {
goto out;
}
} else {
struct ip_mreq mreq;
struct sockaddr_in *mcast_ipv4 = (struct sockaddr_in *) &mcast;
mreq.imr_multiaddr = mcast_ipv4->sin_addr;
mreq.imr_interface.s_addr = htonl (INADDR_ANY);
rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof (mreq));
if (rc < 0) {
goto out;
}
int loop = 1;
rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
sizeof (loop));
if (rc < 0) {
goto out;
}
}
msleep (SETTLE_TIME);
rc =
sendto (send_sock, msg, strlen (msg), 0, (struct sockaddr *) &mcast, sl);
if (rc < 0) {
goto out;
}
msleep (SETTLE_TIME);
rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
if (rc < 0) {
goto out;
}
buf[rc] = '\0';
success = (strcmp (msg, buf) == 0);
out:
if (bind_sock >= 0) {
close (bind_sock);
}
if (send_sock >= 0) {
close (send_sock);
}
return success;
}
static void test_radio_dish_mcast (int ipv6_)
{
void *radio = test_context_socket (ZMQ_RADIO);
void *dish = test_context_socket (ZMQ_DISH);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
const char *url = mcast_url (ipv6_);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
msleep (SETTLE_TIME);
TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
msg_send_expect_success (radio, "TV", "Friends");
msg_recv_cmp (dish, "TV", "Friends");
test_context_socket_close (dish);
test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_mcast)
static void test_radio_dish_no_loop (int ipv6_)
{
void *radio = test_context_socket (ZMQ_RADIO);
void *dish = test_context_socket (ZMQ_DISH);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
// Disable multicast loop
int loop = 0;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));
const char *url = mcast_url (ipv6_);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
msleep (SETTLE_TIME);
TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
msg_send_expect_success (radio, "TV", "Friends");
// Looping is disabled, we shouldn't receive anything
msleep (SETTLE_TIME);
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
int rc = zmq_msg_recv (&msg, dish, ZMQ_DONTWAIT);
zmq_msg_close (&msg);
TEST_ASSERT_EQUAL_INT (rc, -1);
TEST_ASSERT_EQUAL_INT (errno, EAGAIN);
test_context_socket_close (dish);
test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_no_loop)
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
...@@ -268,5 +500,18 @@ int main (void) ...@@ -268,5 +500,18 @@ int main (void)
RUN_TEST (test_radio_dish_udp_ipv4); RUN_TEST (test_radio_dish_udp_ipv4);
RUN_TEST (test_radio_dish_udp_ipv6); RUN_TEST (test_radio_dish_udp_ipv6);
bool ipv4_mcast = is_multicast_available (false);
bool ipv6_mcast = is_ipv6_available () && is_multicast_available (true);
if (ipv4_mcast) {
RUN_TEST (test_radio_dish_mcast_ipv4);
RUN_TEST (test_radio_dish_no_loop_ipv4);
}
if (ipv6_mcast) {
RUN_TEST (test_radio_dish_mcast_ipv6);
RUN_TEST (test_radio_dish_no_loop_ipv6);
}
return UNITY_END (); return UNITY_END ();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment