Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0f896fcd
Unverified
Commit
0f896fcd
authored
May 10, 2018
by
Luca Boccassi
Committed by
GitHub
May 10, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3087 from simias/mcast_loop
Problem: ZMQ doesn't expose the MULTICAST_LOOP socket option
parents
e9211aed
4b635c3d
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
331 additions
and
5 deletions
+331
-5
zmq_getsockopt.txt
doc/zmq_getsockopt.txt
+12
-0
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+11
-0
zmq_udp.txt
doc/zmq_udp.txt
+22
-5
zmq.h
include/zmq.h
+1
-0
options.cpp
src/options.cpp
+13
-0
options.hpp
src/options.hpp
+3
-0
udp_engine.cpp
src/udp_engine.cpp
+23
-0
zmq_draft.h
src/zmq_draft.h
+1
-0
test_radio_dish.cpp
tests/test_radio_dish.cpp
+245
-0
No files found.
doc/zmq_getsockopt.txt
View file @
0f896fcd
...
@@ -892,6 +892,18 @@ Default value:: -1
...
@@ -892,6 +892,18 @@ Default value:: -1
Applicable socket types:: all, when using VMCI transport
Applicable socket types:: all, when using VMCI transport
ZMQ_MULTICAST_LOOP: Retrieve multicast local loopback configuration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the current multicast loopback configuration. A value of `1`
means that the multicast packets sent on this socket will be looped
back to local listening interface.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
RETURN VALUE
RETURN VALUE
------------
------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
...
...
doc/zmq_setsockopt.txt
View file @
0f896fcd
...
@@ -1273,6 +1273,17 @@ Default value:: -1
...
@@ -1273,6 +1273,17 @@ Default value:: -1
Applicable socket types:: all, when using VMCI transport
Applicable socket types:: all, when using VMCI transport
ZMQ_MULTICAST_LOOP: Control multicast local loopback
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For multicast UDP sender sockets this option sets whether the data
sent should be looped back on local listening sockets.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
RETURN VALUE
RETURN VALUE
------------
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
...
...
doc/zmq_udp.txt
View file @
0f896fcd
...
@@ -37,11 +37,12 @@ colon and the UDP port number to use.
...
@@ -37,11 +37,12 @@ colon and the UDP port number to use.
An 'interface' may be specified by either of the following:
An 'interface' may be specified by either of the following:
* The wild-card `*`, meaning all available interfaces.
* The wild-card `*`, meaning all available interfaces.
* The
primary IPv4 address assigned to the interface, in its numeric
* The
name of the network interface (i.e. eth0, lo, wlan0 etc...)
representation.
* The primary address assigned to the interface, in its numeric
representation.
* Multicast address in its numeric representation the socket should join.
* Multicast address in its numeric representation the socket should join.
The UDP port number may be specified a numeric value, usually above 1024 on POSIX systems.
The UDP port number may be specified a numeric value, usually above
1024 on POSIX systems.
Connecting a socket
Connecting a socket
~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~
...
@@ -52,7 +53,8 @@ a colon and the UDP port number to use.
...
@@ -52,7 +53,8 @@ a colon and the UDP port number to use.
A 'peer address' may be specified by either of the following:
A 'peer address' may be specified by either of the following:
* The IPv4 or IPv6 address of the peer, in its numeric representation.
* The IPv4 or IPv6 address of the peer, in its numeric representation
or using its hostname.
* Multicast address in its numeric representation.
* Multicast address in its numeric representation.
EXAMPLES
EXAMPLES
...
@@ -65,9 +67,18 @@ assert (rc == 0);
...
@@ -65,9 +67,18 @@ assert (rc == 0);
// Unicast - UDP port 5555 on the local loop-back interface
// Unicast - UDP port 5555 on the local loop-back interface
rc = zmq_bind(dish, "udp://127.0.0.1:5555");
rc = zmq_bind(dish, "udp://127.0.0.1:5555");
assert (rc == 0);
assert (rc == 0);
// Unicast - UDP port 5555 on interface eth1
rc = zmq_bind(dish, "udp://eth1:5555");
assert (rc == 0);
// Multicast - UDP port 5555 on a Multicast address
// Multicast - UDP port 5555 on a Multicast address
rc = zmq_bind(dish, "udp://239.0.0.1:5555");
rc = zmq_bind(dish, "udp://239.0.0.1:5555");
assert (rc == 0);
assert (rc == 0);
// Same as above but joining only on interface eth0
rc = zmq_bind(dish, "udp://eth0;239.0.0.1:5555");
assert (rc == 0);
// Same as above using IPv6 multicast
rc = zmq_bind(dish, "udp://eth0;[ff02::1]:5555");
assert (rc == 0);
----
----
...
@@ -76,9 +87,15 @@ assert (rc == 0);
...
@@ -76,9 +87,15 @@ assert (rc == 0);
// Connecting using an Unicast IP address
// Connecting using an Unicast IP address
rc = zmq_connect(radio, "udp://192.168.1.1:5555");
rc = zmq_connect(radio, "udp://192.168.1.1:5555");
assert (rc == 0);
assert (rc == 0);
// Connecting using a Multicast address
"
// Connecting using a Multicast address
rc = zmq_connect(socket, "udp://239.0.0.1:5555);
rc = zmq_connect(socket, "udp://239.0.0.1:5555);
assert (rc == 0);
assert (rc == 0);
// Connecting using a Multicast address using local interface wlan0
rc = zmq_connect(socket, "udp://wlan0;239.0.0.1:5555);
assert (rc == 0);
// Connecting to IPv6 multicast
rc = zmq_connect(socket, "udp://[ff02::1]:5555);
assert (rc == 0);
----
----
...
...
include/zmq.h
View file @
0f896fcd
...
@@ -596,6 +596,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
...
@@ -596,6 +596,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
#define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96
/* DRAFT 0MQ socket events and monitoring */
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
/* Unspecified system errors during handshake. Event value is an errno. */
...
...
src/options.cpp
View file @
0f896fcd
...
@@ -232,6 +232,7 @@ zmq::options_t::options_t () :
...
@@ -232,6 +232,7 @@ zmq::options_t::options_t () :
use_fd
(
-
1
),
use_fd
(
-
1
),
zap_enforce_domain
(
false
),
zap_enforce_domain
(
false
),
loopback_fastpath
(
false
),
loopback_fastpath
(
false
),
multicast_loop
(
true
),
zero_copy
(
true
)
zero_copy
(
true
)
{
{
memset
(
curve_public_key
,
0
,
CURVE_KEYSIZE
);
memset
(
curve_public_key
,
0
,
CURVE_KEYSIZE
);
...
@@ -726,6 +727,11 @@ int zmq::options_t::setsockopt (int option_,
...
@@ -726,6 +727,11 @@ int zmq::options_t::setsockopt (int option_,
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
break
;
break
;
case
ZMQ_MULTICAST_LOOP
:
return
do_setsockopt_int_as_bool_relaxed
(
optval_
,
optvallen_
,
&
multicast_loop
);
default
:
default
:
#if defined(ZMQ_ACT_MILITANT)
#if defined(ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option
// There are valid scenarios for probing with unknown socket option
...
@@ -1120,6 +1126,13 @@ int zmq::options_t::getsockopt (int option_,
...
@@ -1120,6 +1126,13 @@ int zmq::options_t::getsockopt (int option_,
}
}
break
;
break
;
case
ZMQ_MULTICAST_LOOP
:
if
(
is_int
)
{
*
value
=
multicast_loop
;
return
0
;
}
break
;
default
:
default
:
#if defined(ZMQ_ACT_MILITANT)
#if defined(ZMQ_ACT_MILITANT)
malformed
=
false
;
malformed
=
false
;
...
...
src/options.hpp
View file @
0f896fcd
...
@@ -257,6 +257,9 @@ struct options_t
...
@@ -257,6 +257,9 @@ struct options_t
// Use of loopback fastpath.
// Use of loopback fastpath.
bool
loopback_fastpath
;
bool
loopback_fastpath
;
// Loop sent multicast packets to local sockets
bool
multicast_loop
;
// Use zero copy strategy for storing message content when decoding.
// Use zero copy strategy for storing message content when decoding.
bool
zero_copy
;
bool
zero_copy
;
...
...
src/udp_engine.cpp
View file @
0f896fcd
...
@@ -119,6 +119,29 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
...
@@ -119,6 +119,29 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
const
ip_addr_t
*
out
=
address
->
resolved
.
udp_addr
->
target_addr
();
const
ip_addr_t
*
out
=
address
->
resolved
.
udp_addr
->
target_addr
();
out_address
=
out
->
as_sockaddr
();
out_address
=
out
->
as_sockaddr
();
out_addrlen
=
out
->
sockaddr_len
();
out_addrlen
=
out
->
sockaddr_len
();
if
(
out
->
is_multicast
())
{
int
level
;
int
optname
;
if
(
out
->
family
()
==
AF_INET6
)
{
level
=
IPPROTO_IPV6
;
optname
=
IPV6_MULTICAST_LOOP
;
}
else
{
level
=
IPPROTO_IP
;
optname
=
IP_MULTICAST_LOOP
;
}
int
loop
=
options
.
multicast_loop
;
int
rc
=
setsockopt
(
fd
,
level
,
optname
,
(
char
*
)
&
loop
,
sizeof
(
loop
));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
errno_assert
(
rc
==
0
);
#endif
}
}
else
{
}
else
{
/// XXX fixme ?
/// XXX fixme ?
out_address
=
(
sockaddr
*
)
&
raw_address
;
out_address
=
(
sockaddr
*
)
&
raw_address
;
...
...
src/zmq_draft.h
View file @
0f896fcd
...
@@ -54,6 +54,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
...
@@ -54,6 +54,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
#define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96
/* DRAFT 0MQ socket events and monitoring */
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
/* Unspecified system errors during handshake. Event value is an errno. */
...
...
tests/test_radio_dish.cpp
View file @
0f896fcd
...
@@ -251,6 +251,238 @@ void test_radio_dish_udp (int ipv6_)
...
@@ -251,6 +251,238 @@ void test_radio_dish_udp (int ipv6_)
}
}
MAKE_TEST_V4V6
(
test_radio_dish_udp
)
MAKE_TEST_V4V6
(
test_radio_dish_udp
)
#define MCAST_IPV4 "226.8.5.5"
#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
static
const
char
*
mcast_url
(
int
ipv6_
)
{
if
(
ipv6_
)
{
return
"udp://["
MCAST_IPV6
"]:5555"
;
}
else
{
return
"udp://["
MCAST_IPV4
"]:5555"
;
}
}
// OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif
// Test if multicast is available on this machine by attempting to
// send a receive a multicast datagram
static
bool
is_multicast_available
(
int
ipv6_
)
{
int
family
=
ipv6_
?
AF_INET6
:
AF_INET
;
int
bind_sock
=
-
1
;
int
send_sock
=
-
1
;
int
port
=
5555
;
bool
success
=
false
;
const
char
*
msg
=
"it works"
;
char
buf
[
32
];
struct
sockaddr_storage
any
;
struct
sockaddr_storage
mcast
;
socklen_t
sl
;
int
rc
;
if
(
ipv6_
)
{
struct
sockaddr_in6
*
any_ipv6
=
(
struct
sockaddr_in6
*
)
&
any
;
struct
sockaddr_in6
*
mcast_ipv6
=
(
struct
sockaddr_in6
*
)
&
mcast
;
any_ipv6
->
sin6_family
=
AF_INET6
;
any_ipv6
->
sin6_port
=
htons
(
port
);
any_ipv6
->
sin6_flowinfo
=
0
;
any_ipv6
->
sin6_scope_id
=
0
;
rc
=
inet_pton
(
AF_INET6
,
"::"
,
&
any_ipv6
->
sin6_addr
);
if
(
rc
==
0
)
{
goto
out
;
}
*
mcast_ipv6
=
*
any_ipv6
;
rc
=
inet_pton
(
AF_INET6
,
MCAST_IPV6
,
&
mcast_ipv6
->
sin6_addr
);
if
(
rc
==
0
)
{
goto
out
;
}
sl
=
sizeof
(
*
any_ipv6
);
}
else
{
struct
sockaddr_in
*
any_ipv4
=
(
struct
sockaddr_in
*
)
&
any
;
struct
sockaddr_in
*
mcast_ipv4
=
(
struct
sockaddr_in
*
)
&
mcast
;
any_ipv4
->
sin_family
=
AF_INET
;
any_ipv4
->
sin_port
=
htons
(
5555
);
rc
=
inet_pton
(
AF_INET
,
"0.0.0.0"
,
&
any_ipv4
->
sin_addr
);
if
(
rc
==
0
)
{
goto
out
;
}
*
mcast_ipv4
=
*
any_ipv4
;
rc
=
inet_pton
(
AF_INET
,
MCAST_IPV4
,
&
mcast_ipv4
->
sin_addr
);
if
(
rc
==
0
)
{
goto
out
;
}
sl
=
sizeof
(
*
any_ipv4
);
}
bind_sock
=
socket
(
family
,
SOCK_DGRAM
,
IPPROTO_UDP
);
if
(
bind_sock
<
0
)
{
goto
out
;
}
send_sock
=
socket
(
family
,
SOCK_DGRAM
,
IPPROTO_UDP
);
if
(
bind_sock
<
0
)
{
goto
out
;
}
rc
=
bind
(
bind_sock
,
(
struct
sockaddr
*
)
&
any
,
sl
);
if
(
rc
<
0
)
{
goto
out
;
}
if
(
ipv6_
)
{
struct
ipv6_mreq
mreq
;
struct
sockaddr_in6
*
mcast_ipv6
=
(
struct
sockaddr_in6
*
)
&
mcast
;
mreq
.
ipv6mr_multiaddr
=
mcast_ipv6
->
sin6_addr
;
mreq
.
ipv6mr_interface
=
0
;
rc
=
setsockopt
(
bind_sock
,
IPPROTO_IPV6
,
IPV6_ADD_MEMBERSHIP
,
&
mreq
,
sizeof
(
mreq
));
if
(
rc
<
0
)
{
goto
out
;
}
int
loop
=
1
;
rc
=
setsockopt
(
send_sock
,
IPPROTO_IPV6
,
IPV6_MULTICAST_LOOP
,
&
loop
,
sizeof
(
loop
));
if
(
rc
<
0
)
{
goto
out
;
}
}
else
{
struct
ip_mreq
mreq
;
struct
sockaddr_in
*
mcast_ipv4
=
(
struct
sockaddr_in
*
)
&
mcast
;
mreq
.
imr_multiaddr
=
mcast_ipv4
->
sin_addr
;
mreq
.
imr_interface
.
s_addr
=
htonl
(
INADDR_ANY
);
rc
=
setsockopt
(
bind_sock
,
IPPROTO_IP
,
IP_ADD_MEMBERSHIP
,
&
mreq
,
sizeof
(
mreq
));
if
(
rc
<
0
)
{
goto
out
;
}
int
loop
=
1
;
rc
=
setsockopt
(
send_sock
,
IPPROTO_IP
,
IP_MULTICAST_LOOP
,
&
loop
,
sizeof
(
loop
));
if
(
rc
<
0
)
{
goto
out
;
}
}
msleep
(
SETTLE_TIME
);
rc
=
sendto
(
send_sock
,
msg
,
strlen
(
msg
),
0
,
(
struct
sockaddr
*
)
&
mcast
,
sl
);
if
(
rc
<
0
)
{
goto
out
;
}
msleep
(
SETTLE_TIME
);
rc
=
recvfrom
(
bind_sock
,
buf
,
sizeof
(
buf
)
-
1
,
0
,
NULL
,
0
);
if
(
rc
<
0
)
{
goto
out
;
}
buf
[
rc
]
=
'\0'
;
success
=
(
strcmp
(
msg
,
buf
)
==
0
);
out
:
if
(
bind_sock
>=
0
)
{
close
(
bind_sock
);
}
if
(
send_sock
>=
0
)
{
close
(
send_sock
);
}
return
success
;
}
static
void
test_radio_dish_mcast
(
int
ipv6_
)
{
void
*
radio
=
test_context_socket
(
ZMQ_RADIO
);
void
*
dish
=
test_context_socket
(
ZMQ_DISH
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
radio
,
ZMQ_IPV6
,
&
ipv6_
,
sizeof
(
int
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
dish
,
ZMQ_IPV6
,
&
ipv6_
,
sizeof
(
int
)));
const
char
*
url
=
mcast_url
(
ipv6_
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
dish
,
url
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
radio
,
url
));
msleep
(
SETTLE_TIME
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_join
(
dish
,
"TV"
));
msg_send_expect_success
(
radio
,
"TV"
,
"Friends"
);
msg_recv_cmp
(
dish
,
"TV"
,
"Friends"
);
test_context_socket_close
(
dish
);
test_context_socket_close
(
radio
);
}
MAKE_TEST_V4V6
(
test_radio_dish_mcast
)
static
void
test_radio_dish_no_loop
(
int
ipv6_
)
{
void
*
radio
=
test_context_socket
(
ZMQ_RADIO
);
void
*
dish
=
test_context_socket
(
ZMQ_DISH
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
radio
,
ZMQ_IPV6
,
&
ipv6_
,
sizeof
(
int
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
dish
,
ZMQ_IPV6
,
&
ipv6_
,
sizeof
(
int
)));
// Disable multicast loop
int
loop
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
radio
,
ZMQ_MULTICAST_LOOP
,
&
loop
,
sizeof
(
int
)));
const
char
*
url
=
mcast_url
(
ipv6_
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
dish
,
url
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
radio
,
url
));
msleep
(
SETTLE_TIME
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_join
(
dish
,
"TV"
));
msg_send_expect_success
(
radio
,
"TV"
,
"Friends"
);
// Looping is disabled, we shouldn't receive anything
msleep
(
SETTLE_TIME
);
zmq_msg_t
msg
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
msg
));
int
rc
=
zmq_msg_recv
(
&
msg
,
dish
,
ZMQ_DONTWAIT
);
zmq_msg_close
(
&
msg
);
TEST_ASSERT_EQUAL_INT
(
rc
,
-
1
);
TEST_ASSERT_EQUAL_INT
(
errno
,
EAGAIN
);
test_context_socket_close
(
dish
);
test_context_socket_close
(
radio
);
}
MAKE_TEST_V4V6
(
test_radio_dish_no_loop
)
int
main
(
void
)
int
main
(
void
)
{
{
setup_test_environment
();
setup_test_environment
();
...
@@ -268,5 +500,18 @@ int main (void)
...
@@ -268,5 +500,18 @@ int main (void)
RUN_TEST
(
test_radio_dish_udp_ipv4
);
RUN_TEST
(
test_radio_dish_udp_ipv4
);
RUN_TEST
(
test_radio_dish_udp_ipv6
);
RUN_TEST
(
test_radio_dish_udp_ipv6
);
bool
ipv4_mcast
=
is_multicast_available
(
false
);
bool
ipv6_mcast
=
is_ipv6_available
()
&&
is_multicast_available
(
true
);
if
(
ipv4_mcast
)
{
RUN_TEST
(
test_radio_dish_mcast_ipv4
);
RUN_TEST
(
test_radio_dish_no_loop_ipv4
);
}
if
(
ipv6_mcast
)
{
RUN_TEST
(
test_radio_dish_mcast_ipv6
);
RUN_TEST
(
test_radio_dish_no_loop_ipv6
);
}
return
UNITY_END
();
return
UNITY_END
();
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment