Commit b60689e5 authored by Chuck Remes's avatar Chuck Remes

Merge pull request #508 from hintjens/master

IPv6 related changes and cleanups to test cases
parents 049931fc 309740e1
......@@ -42,6 +42,7 @@ tests/test_connect_resolve
tests/test_connect_delay
tests/test_term_endpoint
tests/test_router_mandatory
tests/test_disconnect_inproc
tests/test_raw_sock
tests/test_disconnect_inproc
src/platform.hpp*
......
......@@ -324,12 +324,24 @@ Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_IPV6: Retrieve IPv6 socket status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the IPv6 option for the socket. A value of `1` means IPv6 is
enabled on the socket, while `0` means the socket will use only IPv4.
When IPv6 is enabled the socket will connect to, or accept connections
from, both IPv4 and IPv6 hosts.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, when using TCP transports.
ZMQ_IPV4ONLY: Retrieve IPv4-only socket override status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the underlying native socket type. A value of `1` will use IPv4
sockets, while the value of `0` will use IPv6 sockets. An IPv6 socket
lets applications connect to and accept connections from both IPv4 and IPv6
hosts.
Retrieve the IPv4-only option for the socket. This option is deprecated.
Please use the ZMQ_IPV6 option.
[horizontal]
Option value type:: int
......@@ -470,6 +482,7 @@ Option value unit:: -1,>0
Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
RETURN VALUE
------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
......
......@@ -342,12 +342,26 @@ Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_IPV4ONLY: Use IPv4-only sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_IPV6: Enable IPv6 on socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the underlying native socket type. A value of `1` will use IPv4 sockets,
while the value of `0` will use IPv6 sockets. An IPv6 socket lets
applications connect to and accept connections from both IPv4 and IPv6 hosts.
Set the IPv6 option for the socket. A value of `1` means IPv6 is
enabled on the socket, while `0` means the socket will use only IPv4.
When IPv6 is enabled the socket will connect to, or accept connections
from, both IPv4 and IPv6 hosts.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, when using TCP transports.
ZMQ_IPV4ONLY: Use IPv4-only on socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Set the IPv4-only ootion for the socket. This option is deprecated.
Please use the ZMQ_IPV6 option.
[horizontal]
Option value type:: int
......
......@@ -241,7 +241,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_MULTICAST_HOPS 25
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31
#define ZMQ_IPV4ONLY 31 /* Request replacement by IPV6 */
#define ZMQ_LAST_ENDPOINT 32
#define ZMQ_ROUTER_MANDATORY 33
#define ZMQ_TCP_KEEPALIVE 34
......@@ -252,6 +252,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
#define ZMQ_XPUB_VERBOSE 40
#define ZMQ_ROUTER_RAW 41
#define ZMQ_IPV6 42
/* Message options */
......
......@@ -43,7 +43,7 @@ zmq::options_t::options_t () :
maxmsgsize (-1),
rcvtimeo (-1),
sndtimeo (-1),
ipv4only (1),
ipv6 (0),
delay_attach_on_connect (0),
delay_on_close (true),
delay_on_disconnect (true),
......@@ -61,282 +61,205 @@ zmq::options_t::options_t () :
int zmq::options_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
switch (option_) {
bool valid = true;
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_SNDHWM:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
sndhwm = *((int*) optval_);
return 0;
if (is_int && value >= 0)
sndhwm = value;
else
valid = false;
break;
case ZMQ_RCVHWM:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
rcvhwm = *((int*) optval_);
return 0;
if (is_int && value >= 0)
rcvhwm = value;
else
valid = false;
break;
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
if (optvallen_ == sizeof (uint64_t))
affinity = *((uint64_t*) optval_);
return 0;
else
valid = false;
break;
case ZMQ_IDENTITY:
// Empty identity is invalid as well as identity longer than
// 255 bytes. Identity starting with binary zero is invalid
// as these are used for auto-generated identities.
if (optvallen_ < 1 || optvallen_ > 255 ||
*((const unsigned char*) optval_) == 0) {
errno = EINVAL;
return -1;
}
if (optvallen_ > 0 && optvallen_ < 256
&& *((const unsigned char *) optval_) != 0) {
identity_size = optvallen_;
memcpy (identity, optval_, identity_size);
return 0;
}
else
valid = false;
break;
case ZMQ_RATE:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
return -1;
}
rate = *((int*) optval_);
return 0;
if (is_int && value > 0)
rate = value;
else
valid = false;
break;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
recovery_ivl = *((int*) optval_);
return 0;
if (is_int && value >= 0)
recovery_ivl = value;
else
valid = false;
case ZMQ_SNDBUF:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
sndbuf = *((int*) optval_);
return 0;
if (is_int && value >= 0)
sndbuf = value;
else
valid = false;
break;
case ZMQ_RCVBUF:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
rcvbuf = *((int*) optval_);
return 0;
if (is_int && value >= 0)
rcvbuf = value;
else
valid = false;
break;
case ZMQ_LINGER:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
linger = *((int*) optval_);
return 0;
if (is_int && value >= -1)
linger = value;
else
valid = false;
break;
case ZMQ_RECONNECT_IVL:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
if (*((int*) optval_) < -1) {
errno = EINVAL;
return -1;
}
reconnect_ivl = *((int*) optval_);
return 0;
if (is_int && value >= -1)
reconnect_ivl = value;
else
valid = false;
break;
case ZMQ_RECONNECT_IVL_MAX:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
if (*((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
reconnect_ivl_max = *((int*) optval_);
return 0;
if (is_int && value >= 0)
reconnect_ivl_max = value;
else
valid = false;
break;
case ZMQ_BACKLOG:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
backlog = *((int*) optval_);
return 0;
if (is_int && value >= 0)
backlog = value;
else
valid = false;
break;
case ZMQ_MAXMSGSIZE:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
maxmsgsize = *((int64_t*) optval_);
return 0;
if (optvallen_ == sizeof (int64_t))
maxmsgsize = *((int64_t *) optval_);
else
valid = false;
case ZMQ_MULTICAST_HOPS:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
return -1;
}
multicast_hops = *((int*) optval_);
return 0;
if (is_int && value > 0)
multicast_hops = value;
else
valid = false;
break;
case ZMQ_RCVTIMEO:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
rcvtimeo = *((int*) optval_);
return 0;
if (is_int && value >= -1)
rcvtimeo = value;
else
valid = false;
break;
case ZMQ_SNDTIMEO:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
sndtimeo = *((int*) optval_);
return 0;
if (is_int && value >= -1)
sndtimeo = value;
else
valid = false;
break;
/* Deprecated in favor of ZMQ_IPV6 */
case ZMQ_IPV4ONLY:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
ipv4only = val;
return 0;
}
if (is_int && (value == 0 || value == 1))
ipv6 = 1 - value;
else
valid = false;
break;
/* To replace the somewhat surprising IPV4ONLY */
case ZMQ_IPV6:
if (is_int && (value == 0 || value == 1))
ipv6 = value;
else
valid = false;
break;
case ZMQ_TCP_KEEPALIVE:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != -1 && val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
#if defined ZMQ_HAVE_SO_KEEPALIVE
tcp_keepalive = val;
#endif
return 0;
}
case ZMQ_DELAY_ATTACH_ON_CONNECT:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
delay_attach_on_connect = val;
return 0;
}
if (is_int && (value >= -1 || value <= 1))
tcp_keepalive = value;
else
valid = false;
break;
case ZMQ_TCP_KEEPALIVE_CNT:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val <= 0 && val != -1) {
errno = EINVAL;
return -1;
}
#if defined ZMQ_HAVE_SO_KEEPALIVE && defined ZMQ_HAVE_TCP_KEEPCNT
tcp_keepalive_cnt = val;
#endif
return 0;
}
if (is_int && (value == -1 || value >= 0))
tcp_keepalive_cnt = value;
else
valid = false;
break;
case ZMQ_TCP_KEEPALIVE_IDLE:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val <= 0 && val != -1) {
errno = EINVAL;
return -1;
}
#if defined ZMQ_HAVE_SO_KEEPALIVE && (defined ZMQ_HAVE_TCP_KEEPIDLE || defined ZMQ_HAVE_TCP_KEEPALIVE)
tcp_keepalive_idle = val;
#endif
return 0;
}
if (is_int && (value == -1 || value >= 0))
tcp_keepalive_idle = value;
else
valid = false;
break;
case ZMQ_TCP_KEEPALIVE_INTVL:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val <= 0 && val != -1) {
errno = EINVAL;
return -1;
}
#if defined ZMQ_HAVE_SO_KEEPALIVE && defined ZMQ_HAVE_TCP_KEEPINTVL
tcp_keepalive_intvl = val;
#endif
return 0;
}
if (is_int && (value == -1 || value >= 0))
tcp_keepalive_intvl = value;
else
valid = false;
break;
case ZMQ_DELAY_ATTACH_ON_CONNECT:
if (is_int && (value == 0 || value == 1))
delay_attach_on_connect = value;
else
valid = false;
break;
case ZMQ_TCP_ACCEPT_FILTER:
{
if (optvallen_ == 0 && optval_ == NULL) {
if (optvallen_ == 0 && optval_ == NULL)
tcp_accept_filters.clear ();
return 0;
}
else
if (optvallen_ < 1 || optvallen_ > 255 || optval_ == NULL || *((const char*) optval_) == 0) {
errno = EINVAL;
return -1;
}
if (optvallen_ < 1 || optvallen_ > 255 || optval_ == NULL || *((const char*) optval_) == 0)
valid = false;
else {
std::string filter_str ((const char*) optval_, optvallen_);
std::string filter_str ((const char *) optval_, optvallen_);
tcp_address_mask_t mask;
int rc = mask.resolve (filter_str.c_str (), ipv4only ? true : false);
if (rc != 0) {
errno = EINVAL;
return -1;
int rc = mask.resolve (filter_str.c_str (), ipv6);
if (rc == 0)
tcp_accept_filters.push_back (mask);
else
valid = false;
}
tcp_accept_filters.push_back(mask);
break;
return 0;
}
}
default:
{
valid = false;
break;
}
if (valid)
return 0;
else {
errno = EINVAL;
return -1;
}
}
}
int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
......@@ -501,7 +424,16 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL;
return -1;
}
*((int*) optval_) = ipv4only;
*((int*) optval_) = 1 - ipv6;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_IPV6:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = ipv6;
*optvallen_ = sizeof (int);
return 0;
......@@ -551,7 +483,7 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
return 0;
case ZMQ_LAST_ENDPOINT:
// don't allow string which cannot contain the entire message
/* don't allow string which cannot contain the entire message */
if (*optvallen_ < last_endpoint.size() + 1) {
errno = EINVAL;
return -1;
......
......@@ -92,10 +92,8 @@ namespace zmq
int rcvtimeo;
int sndtimeo;
// If 1, indicates the use of IPv4 sockets only, it will not be
// possible to communicate with IPv6-only hosts. If 0, the socket can
// connect to and accept connections from both IPv4 and IPv6 hosts.
int ipv4only;
// If true, IPv6 is enabled (as well as IPv4)
bool ipv6;
// If 1, connecting pipes are not attached immediately, meaning a send()
// on a socket with only connecting pipes would block
......
......@@ -499,7 +499,7 @@ int zmq::socket_base_t::connect (const char *addr_)
paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (paddr->resolved.tcp_addr);
int rc = paddr->resolved.tcp_addr->resolve (
address.c_str (), false, options.ipv4only ? true : false);
address.c_str (), false, options.ipv6);
if (rc != 0) {
delete paddr;
return -1;
......
......@@ -52,10 +52,10 @@
#include <stdlib.h>
// On Solaris platform, network interface name can be queried by ioctl.
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_)
{
// TODO: Unused parameter, IPv6 support not implemented for Solaris.
(void) ipv4only_;
(void) ipv6_;
// Create a socket.
int fd = open_socket (AF_INET, SOCK_DGRAM, 0);
......@@ -106,7 +106,6 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
errno = ENODEV;
return -1;
}
return 0;
}
......@@ -117,10 +116,10 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
#include <sys/ioctl.h>
#include <net/if.h>
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_)
{
// TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX.
(void) ipv4only_;
(void) ipv6_;
// Create a socket.
int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
......@@ -141,7 +140,6 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
errno = ENODEV;
return -1;
}
memcpy (&address.ipv4.sin_addr, &((sockaddr_in*) &ifr.ifr_addr)->sin_addr,
sizeof (in_addr));
......@@ -157,10 +155,10 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
// On these platforms, network interface name can be queried
// using getifaddrs function.
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_)
{
// Get the addresses.
ifaddrs* ifa = NULL;
ifaddrs *ifa = NULL;
int rc = getifaddrs (&ifa);
errno_assert (rc == 0);
zmq_assert (ifa != NULL);
......@@ -173,11 +171,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
continue;
int family = ifp->ifa_addr->sa_family;
if ((family == AF_INET
|| (!ipv4only_ && family == AF_INET6))
&& !strcmp (nic_, ifp->ifa_name))
{
if ((family == AF_INET || (ipv6_ && family == AF_INET6))
&& !strcmp (nic_, ifp->ifa_name)) {
memcpy (&address, ifp->ifa_addr,
(family == AF_INET) ? sizeof (struct sockaddr_in)
: sizeof (struct sockaddr_in6));
......@@ -193,7 +188,6 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
errno = ENODEV;
return -1;
}
return 0;
}
......@@ -201,11 +195,11 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
// On other platforms we assume there are no sane interface names.
// This is true especially of Windows.
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_)
{
// All unused parameters.
(void) nic_;
(void) ipv4only_;
(void) ipv6_;
errno = ENODEV;
return -1;
......@@ -213,8 +207,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
#endif
int zmq::tcp_address_t::resolve_interface (const char *interface_,
bool ipv4only_)
int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_)
{
// Initialize temporary output pointers with storage address.
sockaddr_storage ss;
......@@ -223,15 +216,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
// Initialise IP-format family/port and populate temporary output pointers
// with the address.
if (ipv4only_) {
sockaddr_in ip4_addr;
memset (&ip4_addr, 0, sizeof (ip4_addr));
ip4_addr.sin_family = AF_INET;
ip4_addr.sin_addr.s_addr = htonl (INADDR_ANY);
out_addrlen = sizeof ip4_addr;
memcpy (out_addr, &ip4_addr, out_addrlen);
}
else {
if (ipv6_) {
sockaddr_in6 ip6_addr;
memset (&ip6_addr, 0, sizeof (ip6_addr));
ip6_addr.sin6_family = AF_INET6;
......@@ -239,8 +224,15 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
out_addrlen = sizeof ip6_addr;
memcpy (out_addr, &ip6_addr, out_addrlen);
}
// * resolves to INADDR_ANY or in6addr_any.
else {
sockaddr_in ip4_addr;
memset (&ip4_addr, 0, sizeof (ip4_addr));
ip4_addr.sin_family = AF_INET;
ip4_addr.sin_addr.s_addr = htonl (INADDR_ANY);
out_addrlen = sizeof ip4_addr;
memcpy (out_addr, &ip4_addr, out_addrlen);
}
// "*" resolves to INADDR_ANY or in6addr_any.
if (strcmp (interface_, "*") == 0) {
zmq_assert (out_addrlen <= sizeof address);
memcpy (&address, out_addr, out_addrlen);
......@@ -248,7 +240,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
}
// Try to resolve the string as a NIC name.
int rc = resolve_nic_name (interface_, ipv4only_);
int rc = resolve_nic_name (interface_, ipv6_);
if (rc != 0 && errno != ENODEV)
return rc;
if (rc == 0)
......@@ -266,7 +258,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
// Choose IPv4 or IPv6 protocol family. Note that IPv6 allows for
// IPv4-in-IPv6 addresses.
req.ai_family = ipv4only_ ? AF_INET : AF_INET6;
req.ai_family = ipv6_? AF_INET6: AF_INET;
// Arbitrary, not used in the output, but avoids duplicate results.
req.ai_socktype = SOCK_STREAM;
......@@ -304,7 +296,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
return 0;
}
int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv4only_)
int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_)
{
// Set up the query.
#if defined ZMQ_HAVE_OPENVMS && defined __ia64 && __INITIAL_POINTER_SIZE == 64
......@@ -316,7 +308,7 @@ int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv4only_)
// Choose IPv4 or IPv6 protocol family. Note that IPv6 allows for
// IPv4-in-IPv6 addresses.
req.ai_family = ipv4only_ ? AF_INET : AF_INET6;
req.ai_family = ipv6_? AF_INET6: AF_INET;
// Need to choose one to avoid duplicate results from getaddrinfo() - this
// doesn't really matter, since it's not included in the addr-output.
......@@ -382,7 +374,7 @@ zmq::tcp_address_t::~tcp_address_t ()
{
}
int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_)
{
// Find the ':' at end that separates address from the port number.
const char *delimiter = strrchr (name_, ':');
......@@ -390,7 +382,6 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
errno = EINVAL;
return -1;
}
// Separate the address/port.
std::string addr_str (name_, delimiter - name_);
std::string port_str (delimiter + 1);
......@@ -400,8 +391,8 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
addr_str [addr_str.size () - 1] == ']')
addr_str = addr_str.substr (1, addr_str.size () - 2);
uint16_t port;
// Allow 0 specifically, to detect invalid port error in atoi if not
uint16_t port;
if (port_str == "*" || port_str == "0")
// Resolve wildcard to 0 to allow autoselection of port
port = 0;
......@@ -417,9 +408,9 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
// Resolve the IP address.
int rc;
if (local_)
rc = resolve_interface (addr_str.c_str (), ipv4only_);
rc = resolve_interface (addr_str.c_str (), ipv6_);
else
rc = resolve_hostname (addr_str.c_str (), ipv4only_);
rc = resolve_hostname (addr_str.c_str (), ipv6_);
if (rc != 0)
return -1;
......@@ -434,7 +425,8 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
int zmq::tcp_address_t::to_string (std::string &addr_)
{
if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) {
if (address.generic.sa_family != AF_INET
&& address.generic.sa_family != AF_INET6) {
addr_.clear ();
return -1;
}
......@@ -493,7 +485,7 @@ int zmq::tcp_address_mask_t::mask () const
return address_mask;
}
int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv4only_)
int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv6_)
{
// Find '/' at the end that separates address from the cidr mask number.
// Allow empty mask clause and threat it like '/32' for ipv4 or '/128' for ipv6.
......@@ -507,12 +499,11 @@ int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv4only_)
return -1;
}
}
else {
else
addr_str.assign (name_);
}
// Parse address part using standard routines.
int rc = tcp_address_t::resolve_hostname (addr_str.c_str (), ipv4only_);
int rc = tcp_address_t::resolve_hostname (addr_str.c_str (), ipv6_);
if (rc != 0)
return rc;
......
......@@ -45,8 +45,8 @@ namespace zmq
// This function translates textual TCP address into an address
// strcuture. If 'local' is true, names are resolved as local interface
// names. If it is false, names are resolved as remote hostnames.
// If 'ipv4only' is true, the name will never resolve to IPv6 address.
int resolve (const char* name_, bool local_, bool ipv4only_);
// If 'ipv6' is true, the name may resolve to IPv6 address.
int resolve (const char *name_, bool local_, bool ipv6_);
// The opposite to resolve()
virtual int to_string (std::string &addr_);
......@@ -60,10 +60,9 @@ namespace zmq
socklen_t addrlen () const;
protected:
int resolve_nic_name (const char *nic_, bool ipv4only_);
int resolve_interface (const char *interface_, bool ipv4only_);
int resolve_hostname (const char *hostname_, bool ipv4only_);
int resolve_nic_name (const char *nic_, bool ipv6_);
int resolve_interface (const char *interface_, bool ipv6_);
int resolve_hostname (const char *hostname_, bool ipv6_);
union {
sockaddr generic;
......@@ -75,13 +74,12 @@ namespace zmq
class tcp_address_mask_t : public tcp_address_t
{
public:
tcp_address_mask_t ();
// This function enhances tcp_address_t::resolve() with ability to parse
// additional cidr-like(/xx) mask value at the end of the name string.
// Works only with remote hostnames.
int resolve (const char* name_, bool ipv4only_);
int resolve (const char *name_, bool ipv6_);
// The opposite to resolve()
int to_string (std::string &addr_);
......@@ -91,7 +89,6 @@ namespace zmq
bool match_address (const struct sockaddr *ss, const socklen_t ss_len) const;
private:
int address_mask;
};
......
......@@ -148,7 +148,7 @@ int zmq::tcp_listener_t::get_address (std::string &addr_)
int zmq::tcp_listener_t::set_address (const char *addr_)
{
// Convert the textual address into address structure.
int rc = address.resolve (addr_, true, options.ipv4only ? true : false);
int rc = address.resolve (addr_, true, options.ipv6);
if (rc != 0)
return -1;
......@@ -160,8 +160,9 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
#endif
// IPv6 address family not supported, try automatic downgrade to IPv4.
if (address.family () == AF_INET6 && errno == EAFNOSUPPORT &&
!options.ipv4only) {
if (address.family () == AF_INET6
&& errno == EAFNOSUPPORT
&& options.ipv6) {
rc = address.resolve (addr_, true, true);
if (rc != 0)
return rc;
......
/*
Copyright (c) 2012 Ian Barber
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Copyright (c) 2012 Ian Barber
Copyright (c) 2007-2013 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h>
#include <stdlib.h>
#include <string.h>
......@@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
int main (void)
{
fprintf (stderr, "test_connect_delay running...\n");
int val;
int rc;
char buffer[16];
int seen = 0;
// TEST 1.
// First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see
......@@ -53,7 +48,7 @@ int main (void)
val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
rc = zmq_bind(to, "tcp://*:6555");
rc = zmq_bind (to, "tcp://*:6555");
assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive.
......@@ -61,36 +56,32 @@ int main (void)
assert(from);
val = 0;
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val));
// This pipe will not connect
rc = zmq_connect (from, "tcp://localhost:5556");
assert (rc == 0);
// This pipe will
rc = zmq_connect (from, "tcp://localhost:5555");
rc = zmq_connect (from, "tcp://localhost:6555");
assert (rc == 0);
// We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
for (int i = 0; i < 10; ++i) {
rc = zmq_send (from, "Hello", 5, 0);
assert (rc == 5);
}
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// We now consume from the connected pipe
// - we should see just 5
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset (&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1)
break;
int timeout = 100;
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
int seen = 0;
while (true) {
rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1)
break; // Break when we didn't get a message
seen++;
}
assert (seen == 5);
......@@ -112,7 +103,6 @@ int main (void)
// cause the pipe attachment to be delayed until the connection
// succeeds.
context = zmq_ctx_new();
fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n");
// Bind the valid socket
to = zmq_socket (context, ZMQ_PULL);
......@@ -145,26 +135,21 @@ int main (void)
assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
for (int i = 0; i < 10; ++i) {
rc = zmq_send (from, "Hello", 5, 0);
assert (rc == 5);
}
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// Send 10 messages, all should arrive.
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
// If there is a failed delivery, assert!
assert (rc != -1);
while (true) {
rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1)
break; // Break when we didn't get a message
seen++;
}
assert (seen == 10);
rc = zmq_close (from);
assert (rc == 0);
......@@ -178,83 +163,75 @@ int main (void)
// TEST 3
// This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages
// messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again.
context = zmq_ctx_new();
void *context2 = zmq_ctx_new();
fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n");
context = zmq_ctx_new ();
to = zmq_socket (context2, ZMQ_PULL);
assert (to);
rc = zmq_bind (to, "tcp://*:5560");
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (backend);
void *frontend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
int zero = 0;
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
val = 0;
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
// Create a socket pushing
from = zmq_socket (context, ZMQ_PUSH);
assert (from);
val = 0;
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
// Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
assert (rc == 0);
val = 1;
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
// Connect to the valid socket socket
rc = zmq_connect (from, "tcp://localhost:5560");
rc = zmq_connect (frontend, "tcp://localhost:5560");
assert (rc == 0);
// Allow connections to stabilise
zmq_sleep(1);
// Ping backend to frontend so we know when the connection is up
rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
// Send a message, should succeed
std::string message("message ");
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
rc = zmq_close (to);
assert (rc == 0);
// Send message from frontend to backend
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == 5);
rc = zmq_ctx_term(context2);
rc = zmq_close (backend);
assert (rc == 0);
// Give time to process disconnect
zmq_sleep(1);
// There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send a message, should fail
rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT);
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == -1);
context2 = zmq_ctx_new();
to = zmq_socket (context2, ZMQ_PULL);
assert (to);
rc = zmq_bind (to, "tcp://*:5560");
// Recreate backend socket
backend = zmq_socket (context, ZMQ_DEALER);
assert (backend);
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
val = 0;
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
// Allow connections to stabilise
zmq_sleep(1);
// Ping backend to frontend so we know when the connection is up
rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
// After the reconnect, should succeed
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == 5);
rc = zmq_close (to);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (from);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_ctx_term(context);
assert (rc == 0);
rc = zmq_ctx_term(context2);
assert (rc == 0);
}
......@@ -27,9 +27,7 @@
int main (void)
{
fprintf (stderr, "test_connect_resolve running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create pair of socket, each with high watermark of 2. Thus the total
......@@ -47,7 +45,7 @@ int main (void)
rc = zmq_close (sock);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
......
......@@ -28,28 +28,26 @@ int main(int argc, char** argv) {
size_t more_size = sizeof(more);
int iteration = 0;
while(1) {
while (1) {
zmq_pollitem_t items [] = {
{ subSocket, 0, ZMQ_POLLIN, 0 }, // read publications
{ pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions
};
zmq_poll(items, 2, 500);
int rc = zmq_poll (items, 2, 100);
if (items[1].revents & ZMQ_POLLIN) {
if (items [1].revents & ZMQ_POLLIN) {
while (1) {
zmq_msg_t msg;
zmq_msg_init (&msg);
zmq_msg_recv (&msg, pubSocket, 0);
int msgSize = zmq_msg_size(&msg);
char* buffer = (char*)zmq_msg_data(&msg);
if (buffer[0] == 0) {
assert(isSubscribed);
printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1));
isSubscribed = false;
} else {
}
else {
assert(!isSubscribed);
printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1));
isSubscribed = true;
}
......@@ -66,11 +64,6 @@ int main(int argc, char** argv) {
zmq_msg_t msg;
zmq_msg_init (&msg);
zmq_msg_recv (&msg, subSocket, 0);
int msgSize = zmq_msg_size(&msg);
char* buffer = (char*)zmq_msg_data(&msg);
printf("received on subscriber '%s'\n", strndup(buffer, msgSize));
zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&msg);
......@@ -80,34 +73,29 @@ int main(int argc, char** argv) {
}
}
}
if (iteration == 1) {
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno));
}
if (iteration == 4) {
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
}
if (iteration == 10) {
if (iteration > 4 && rc == 0)
break;
}
zmq_msg_t channelEnvlp;
ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3);
zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
zmq_msg_send (&channelEnvlp, pubSocket, ZMQ_SNDMORE) >= 0 || printf("zmq_msg_send: %s\n",zmq_strerror(errno));
zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
zmq_msg_t message;
ZMQ_PREPARE_STRING(message, "this is foo!", 12);
zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
zmq_msg_send (&message, pubSocket, 0) >= 0 || printf("zmq_msg_send: %s\n",zmq_strerror(errno));
zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
iteration++;
}
assert(publicationsReceived == 3);
assert(!isSubscribed);
......
......@@ -19,14 +19,15 @@
*/
#include "../include/zmq.h"
#include <stdio.h>
#include "testutil.hpp"
#include <string.h>
#undef NDEBUG
#include <assert.h>
int main (void)
{
fprintf (stderr, "test_hwm running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create pair of socket, each with high watermark of 2. Thus the total
......@@ -76,7 +77,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
......
......@@ -27,15 +27,16 @@
int main (void)
{
fprintf (stderr, "test_invalid_rep running...\n");
// Create REQ/ROUTER wiring.
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router_socket = zmq_socket (ctx, ZMQ_ROUTER);
assert (router_socket);
void *req_socket = zmq_socket (ctx, ZMQ_REQ);
assert (req_socket);
int linger = 0;
int rc = zmq_setsockopt (router_socket, ZMQ_LINGER, &linger, sizeof (int));
assert (rc == 0);
......@@ -84,7 +85,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (req_socket);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
......
......@@ -29,26 +29,34 @@ static void do_bind_and_verify (void *s, const char *endpoint)
{
int rc = zmq_bind (s, endpoint);
assert (rc == 0);
char test [255];
size_t siz = 255;
rc = zmq_getsockopt (s, ZMQ_LAST_ENDPOINT, test, &siz);
assert (rc == 0 && strcmp (test, endpoint) == 0);
char reported [255];
size_t size = 255;
rc = zmq_getsockopt (s, ZMQ_LAST_ENDPOINT, reported, &size);
assert (rc == 0 && strcmp (reported, endpoint) == 0);
}
int main (void)
{
// Create the infrastructure
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb);
int val = 0;
int rc = zmq_setsockopt (sb, ZMQ_LINGER, &val, sizeof (val));
assert (rc == 0);
do_bind_and_verify (sb, "tcp://127.0.0.1:5560");
do_bind_and_verify (sb, "tcp://127.0.0.1:5561");
do_bind_and_verify (sb, "ipc:///tmp/testep");
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
......@@ -20,7 +20,6 @@
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <pthread.h>
#include <string.h>
#include "testutil.hpp"
......@@ -34,11 +33,9 @@ static int rep_socket_events;
const char *addr;
extern "C"
// REQ socket monitor thread
static void *req_socket_monitor (void *ctx)
{
// REQ socket monitor thread
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
......@@ -50,9 +47,11 @@ extern "C"
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
......@@ -85,14 +84,11 @@ extern "C"
}
zmq_close (s);
return NULL;
}
}
extern "C"
// 2nd REQ socket monitor thread
static void *req2_socket_monitor (void *ctx)
{
// 2nd REQ socket monitor thread
static void *req2_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
......@@ -104,9 +100,11 @@ extern "C"
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
......@@ -123,15 +121,11 @@ extern "C"
}
zmq_close (s);
return NULL;
}
}
extern "C"
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
......@@ -143,9 +137,11 @@ extern "C"
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_LISTENING:
......@@ -178,7 +174,6 @@ extern "C"
}
zmq_close (s);
return NULL;
}
}
int main (void)
......@@ -192,7 +187,7 @@ int main (void)
addr = "tcp://127.0.0.1:5560";
// Create the infrastructure
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// REP socket
......@@ -230,6 +225,8 @@ int main (void)
rc = zmq_connect (req, addr);
assert (rc == 0);
bounce (rep, req);
// 2nd REQ socket
req2 = zmq_socket (ctx, ZMQ_REQ);
assert (req2);
......@@ -243,17 +240,13 @@ int main (void)
rc = zmq_connect (req2, addr);
assert (rc == 0);
bounce (rep, req);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0);
// Allow some time for detecting error states
zmq_sleep (1);
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Close the REQ socket
rc = zmq_close (req);
......@@ -263,10 +256,7 @@ int main (void)
rc = zmq_close (req2);
assert (rc == 0);
// Allow for closed or disconnected events to bubble up
zmq_sleep (1);
zmq_term (ctx);
zmq_ctx_term (ctx);
// Expected REP socket events
assert (rep_socket_events & ZMQ_EVENT_LISTENING);
......
......@@ -28,14 +28,18 @@
int main (void)
{
// Create the infrastructure
void *ctx = zmq_init (0);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb);
int rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
......@@ -49,29 +53,31 @@ int main (void)
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (sb, &msg, 0);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc >= 0);
int more = zmq_msg_get (&msg, ZMQ_MORE);
int more = zmq_msg_more (&msg);
assert (more == 1);
// Then the first part of the message body.
rc = zmq_recvmsg (sb, &msg, 0);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
more = zmq_msg_get (&msg, ZMQ_MORE);
more = zmq_msg_more (&msg);
assert (more == 1);
// And finally, the second part of the message body.
rc = zmq_recvmsg (sb, &msg, 0);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
more = zmq_msg_get (&msg, ZMQ_MORE);
more = zmq_msg_more (&msg);
assert (more == 0);
// Deallocate the infrastructure.
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
......
......@@ -23,9 +23,7 @@
int main (void)
{
fprintf (stderr, "test_pair_inproc running...\n");
void *ctx = zmq_init (0);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
......@@ -46,7 +44,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -23,9 +23,7 @@
int main (void)
{
fprintf (stderr, "test_pair_ipc running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
......@@ -46,7 +44,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -24,9 +24,7 @@
int main (void)
{
fprintf (stderr, "test_pair_tcp running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
......@@ -47,7 +45,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2011 250bpm s.r.o.
Copyright (c) 2007-2013 iMatix Corporation
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
......@@ -19,239 +18,127 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include <netinet/tcp.h>
#include "../include/zmq.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#undef NDEBUG
#include <assert.h>
#include <fcntl.h>
#include <zmq.h>
#include <unistd.h>
#include <poll.h>
//ToDo: Windows?
const char *test_str = "TEST-STRING";
// ZMTP protocol greeting structure
int tcp_client ()
{
struct sockaddr_in serv_addr;
struct hostent *server;
const int portno = 5555;
typedef unsigned char byte;
typedef struct {
byte signature [10]; // 0xFF 8*0x00 0x7F
byte revision; // 0x01 = ZMTP/2.0
byte socktype; // Defined in ZMTP spec
byte identity [2]; // Empty message
} zmtp_greeting_t;
int sockfd = socket (AF_INET, SOCK_STREAM, 0);
assert (sockfd >= 0);
server = gethostbyname ("localhost");
assert (server);
#define ZMTP_DEALER 5 // Socket type constants
#define ZMTP_ROUTER 6
memset (&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET;
memmove (&serv_addr.sin_addr.s_addr, server->h_addr, server->h_length);
serv_addr.sin_port = htons (portno);
int rc = connect (sockfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
assert (rc == 0);
int nodelay = 1;
rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
sizeof nodelay);
assert (rc == 0);
// This is a greeting matching what 0MQ will send us; note the
// 8-byte size is set to 1 for backwards compatibility
return sockfd;
}
static zmtp_greeting_t greeting
= { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, 1, 0, { 0, 0 } };
int tcp_server ()
int main (void)
{
int listenfd = socket (AF_INET, SOCK_STREAM, 0);
assert (listenfd != -1);
int flag = 1;
int rc = setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag);
assert (rc == 0);
struct sockaddr_in serv_addr;
memset (&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl (INADDR_ANY);
serv_addr.sin_port = htons (5555);
rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
assert (rc == 0);
rc = listen (listenfd, 8);
assert (rc == 0);
int sockfd = accept (listenfd, NULL, NULL);
assert (sockfd != -1);
rc = close (listenfd);
assert (rc == 0);
int flags = fcntl (sockfd, F_GETFL, 0);
if (flags == -1)
flags = 0;
rc = fcntl (sockfd, F_SETFL, flags | O_NONBLOCK);
assert (rc != -1);
return sockfd;
}
void tcp_client_write (int sockfd, const void *buf, int buf_len)
{
assert (buf);
int n = write (sockfd, buf, buf_len);
assert (n >= 0);
}
void tcp_client_read (int sockfd)
{
struct timeval tm;
tm.tv_sec = 1;
tm.tv_usec = 0;
fd_set r;
char buffer [16];
FD_ZERO (&r);
FD_SET (sockfd, &r);
int sr = select (sockfd + 1, &r, NULL, NULL, &tm);
assert (sr > 0);
int n = read (sockfd, buffer, 16);
assert (n > 0);
assert (memcmp (buffer, test_str, strlen (test_str)) == 0);
}
int rc;
size_t tcp_read (int s, char *buf, size_t bufsize)
{
size_t bytes_read = 0;
struct pollfd pfd = {s, POLLIN};
int rc = poll (&pfd, 1, 100);
while (rc > 0 && bytes_read < bufsize) {
int n = read (s, buf + bytes_read, bufsize - bytes_read);
if (n <= 0)
return bytes_read;
bytes_read += n;
rc = poll (&pfd, 1, 100);
}
return bytes_read;
}
void tcp_client_close (int sockfd)
{
close (sockfd);
}
void test_zmq_connect ()
{
void *ctx = zmq_init (1);
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void *zs = zmq_socket (ctx, ZMQ_ROUTER);
assert (zs);
// We'll be using this socket in raw mode
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_setsockopt (zs, ZMQ_IDENTITY, "X", 1);
int on = 1;
rc = zmq_setsockopt (router, ZMQ_ROUTER_RAW, &on, sizeof (on));
assert (rc == 0);
int raw_sock = 1;
rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock);
int zero = 0;
rc = zmq_setsockopt (router, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (router, "tcp://*:5555");
assert (rc == 0);
rc = zmq_connect (zs, "tcp://127.0.0.1:5555");
// We'll be using this socket as the other peer
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://localhost:5555");
int i;
for (i = 0; i < 8; i++) {
int server_fd = tcp_server ();
assert (server_fd != -1);
// Send a message on the dealer socket
rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5);
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, strlen (test_str));
// First frame is identity
zmq_msg_t identity;
rc = zmq_msg_init (&identity);
assert (rc == 0);
memcpy (zmq_msg_data (&msg), test_str, strlen (test_str));
rc = zmq_msg_send (&msg, zs, 0);
rc = zmq_msg_recv (&identity, router, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
char buffer [128];
size_t bytes_read = tcp_read (server_fd, buffer, sizeof buffer);
// Second frame is greeting signature
byte buffer [255];
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 10);
assert (memcmp (buffer, greeting.signature, 10) == 0);
assert (bytes_read == strlen (test_str)
&& memcmp (buffer, test_str, bytes_read) == 0);
// Send our own protocol greeting
rc = zmq_msg_send (&identity, router, ZMQ_SNDMORE);
assert (rc > 0);
greeting.socktype = ZMTP_ROUTER;
rc = zmq_send (router, &greeting, sizeof (greeting), 0);
assert (rc == sizeof (greeting));
rc = close (server_fd);
assert (rc == 0);
}
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc = zmq_msg_recv (&identity, router, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
rc = zmq_close (zs);
assert (rc == 0);
// Second frame contains all remaining data from DEALER
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 11);
rc = zmq_term (ctx);
assert (rc == 0);
}
// First four bytes are [revision][socktype][identity]
assert (buffer [0] == 1); // Revision = 1
assert (buffer [1] == ZMTP_DEALER);
int main ()
{
fprintf (stderr, "test_raw_sock running...\n");
// Identity is 2 byte message
assert (buffer [2] == 0); // Flags = 0
assert (buffer [3] == 0); // Size = 0
zmq_msg_t message;
zmq_msg_t id;
// Then we have a 5-byte message "Hello"
assert (buffer [4] == 0); // Flags = 0
assert (buffer [5] == 5); // Size = 5
assert (memcmp (buffer + 6, "Hello", 5) == 0);
//===================
void *ctx = zmq_init (1);
assert (ctx);
// Send "World" back to DEALER
rc = zmq_msg_send (&identity, router, ZMQ_SNDMORE);
assert (rc > 0);
byte world [] = { 0, 5, 'W', 'o', 'r', 'l', 'd' };
rc = zmq_send (router, world, sizeof (world), 0);
assert (rc == sizeof (world));
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb);
// Expect response on DEALER socket
rc = zmq_recv (dealer, buffer, 255, 0);
assert (rc == 5);
assert (memcmp (buffer, "World", 5) == 0);
int raw_sock = 1;
int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_bind (sb, "tcp://127.0.0.1:5555");
assert (rc == 0);
int sock_fd = tcp_client ();
assert (sock_fd >= 0);
// ===================
zmq_msg_init (&message);
zmq_msg_init (&id);
rc = zmq_close (router);
assert (rc == 0);
zmq_pollitem_t items [] = {
{ sb, 0, ZMQ_POLLIN, 0 },
};
tcp_client_write (sock_fd, test_str, strlen (test_str));
zmq_poll (items, 1, 500);
assert (items [0].revents & ZMQ_POLLIN);
int n = zmq_msg_recv (&id, sb, 0);
assert (n > 0);
n = zmq_msg_recv (&message, sb, 0);
assert (n > 0);
assert (memcmp (zmq_msg_data (&message), test_str, strlen (test_str)) == 0);
zmq_msg_send (&id, sb, ZMQ_SNDMORE);
zmq_msg_send (&message, sb, ZMQ_SNDMORE); // SNDMORE option is ignored
tcp_client_read (sock_fd);
tcp_client_close (sock_fd);
zmq_msg_close (&id);
zmq_msg_close (&message);
zmq_close (sb);
zmq_term (ctx);
test_zmq_connect ();
fprintf (stderr, "test_raw_sock PASSED.\n");
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
......@@ -28,9 +28,7 @@
int main (void)
{
fprintf (stderr, "test_reqrep_device running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a req/rep device.
......@@ -66,13 +64,13 @@ int main (void)
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (router, &msg, 0);
rc = zmq_msg_recv (&msg, router, 0);
assert (rc >= 0);
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (dealer, &msg, rcvmore ? ZMQ_SNDMORE : 0);
rc = zmq_msg_send (&msg, dealer, rcvmore? ZMQ_SNDMORE: 0);
assert (rc >= 0);
}
......@@ -104,12 +102,12 @@ int main (void)
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (dealer, &msg, 0);
rc = zmq_msg_recv (&msg, dealer, 0);
assert (rc >= 0);
int rcvmore;
rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
rc = zmq_sendmsg (router, &msg, rcvmore ? ZMQ_SNDMORE : 0);
rc = zmq_msg_send (&msg, router, rcvmore? ZMQ_SNDMORE: 0);
assert (rc >= 0);
}
......@@ -136,7 +134,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -23,9 +23,7 @@
int main (void)
{
fprintf (stderr, "test_reqrep_inproc running...\n");
void *ctx = zmq_init (0);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
......@@ -46,7 +44,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -23,9 +23,7 @@
int main (void)
{
fprintf (stderr, "test_reqrep_ipc running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
......@@ -46,7 +44,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -24,9 +24,7 @@
int main (void)
{
fprintf (stderr, "test_reqrep_tcp running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
......@@ -47,7 +45,7 @@ int main (void)
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
/*
Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2011 iMatix Corporation
Copyright (c) 2007-2013 iMatix Corporation
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
......@@ -19,86 +18,67 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include "testutil.hpp"
#include "../include/zmq_utils.h"
#include <string.h>
#undef NDEBUG
#include <assert.h>
int main (void)
{
fprintf (stderr, "test_router_mandatory running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
// Creating the first socket.
void *sa = zmq_socket (ctx, ZMQ_ROUTER);
assert (sa);
int hwm = 1;
int rc = zmq_setsockopt (sa, ZMQ_SNDHWM, &hwm, sizeof (hwm));
int rc = zmq_bind (router, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_bind (sa, "tcp://127.0.0.1:15560");
assert (rc == 0);
// Sending a message to an unknown peer with the default setting
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE);
// Send a message to an unknown peer with the default setting
// This will not report any error
rc = zmq_send (router, "UNKNOWN", 7, ZMQ_SNDMORE);
assert (rc == 7);
rc = zmq_send (sa, "DATA", 4, 0);
rc = zmq_send (router, "DATA", 4, 0);
assert (rc == 4);
// Send a message to an unknown peer with mandatory routing
// This will fail
int mandatory = 1;
// Set mandatory routing on socket
rc = zmq_setsockopt (sa, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
assert (rc == 0);
// Send a message and check that it fails
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE | ZMQ_DONTWAIT);
rc = zmq_send (router, "UNKNOWN", 7, ZMQ_SNDMORE);
assert (rc == -1 && errno == EHOSTUNREACH);
// Create a valid socket
void *sb = zmq_socket (ctx, ZMQ_DEALER);
assert (sb);
rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm));
// Create dealer called "X" and connect it to our router
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
rc = zmq_setsockopt (sb, ZMQ_IDENTITY, "X", 1);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sb, "tcp://127.0.0.1:15560");
assert (rc == 0);
// wait until connect
zmq_sleep (1);
// make it full and check that it fails
rc = zmq_send (sa, "X", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (sa, "DATA1", 5, 0);
// Get message from dealer to know when connection is ready
char buffer [255];
rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 1);
assert (buffer [0] == 'X');
rc = zmq_send (sa, "X", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
if (rc == 1) {
// the first frame has been sent
rc = zmq_send (sa, "DATA2", 5, 0);
// Send a message to connected dealer now
// It should work
rc = zmq_send (router, "X", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (router, "Hello", 5, 0);
assert (rc == 5);
// send more
rc = zmq_send (sa, "X", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
}
assert (rc == -1 && errno == EAGAIN);
rc = zmq_close (sa);
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_close (sb);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -48,7 +48,6 @@ extern "C"
int main (void)
{
void *ctx;
void *s1;
void *s2;
int i;
......@@ -56,13 +55,12 @@ int main (void)
int rc;
pthread_t threads [THREAD_COUNT];
fprintf (stderr, "test_shutdown_stress running...\n");
for (j = 0; j != 10; j++) {
// Check the shutdown with many parallel I/O threads.
ctx = zmq_init (7);
void *ctx = zmq_ctx_new ();
assert (ctx);
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 7);
s1 = zmq_socket (ctx, ZMQ_PUB);
assert (s1);
......@@ -85,7 +83,7 @@ int main (void)
rc = zmq_close (s1);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
......
......@@ -20,20 +20,17 @@
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <time.h>
#undef NDEBUG
#include <assert.h>
int main (void)
{
fprintf (stderr, "test_sub_forward running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// First, create an intermediate device.
// First, create an intermediate device
void *xpub = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub);
int rc = zmq_bind (xpub, "tcp://127.0.0.1:5560");
......@@ -43,13 +40,13 @@ int main (void)
rc = zmq_bind (xsub, "tcp://127.0.0.1:5561");
assert (rc == 0);
// Create a publisher.
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub);
rc = zmq_connect (pub, "tcp://127.0.0.1:5561");
assert (rc == 0);
// Create a subscriber.
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub);
rc = zmq_connect (sub, "tcp://127.0.0.1:5560");
......@@ -59,27 +56,28 @@ int main (void)
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
// Pass the subscription upstream through the device.
// Pass the subscription upstream through the device
char buff [32];
rc = zmq_recv (xpub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xsub, buff, rc, 0);
assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher.
zmq_sleep (1);
// Wait a bit till the subscription gets to the publisher
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send an empty message.
// Send an empty message
rc = zmq_send (pub, NULL, 0, 0);
assert (rc == 0);
// Pass the message downstream through the device.
// Pass the message downstream through the device
rc = zmq_recv (xsub, buff, sizeof (buff), 0);
assert (rc >= 0);
rc = zmq_send (xpub, buff, rc, 0);
assert (rc >= 0);
// Receive the message in the subscriber.
// Receive the message in the subscriber
rc = zmq_recv (sub, buff, sizeof (buff), 0);
assert (rc == 0);
......@@ -92,7 +90,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
......
......@@ -20,9 +20,9 @@
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <string.h>
#include <unistd.h>
#include <time.h>
#undef NDEBUG
#include <assert.h>
......@@ -33,10 +33,8 @@ int main (void)
char buf[32];
const char *ep = "tcp://127.0.0.1:5560";
fprintf (stderr, "unbind endpoint test running...\n");
// Create infrastructure.
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
......@@ -47,38 +45,34 @@ int main (void)
rc = zmq_connect (pull, ep);
assert (rc == 0);
// Pass one message through to ensure the connection is established.
// Pass one message through to ensure the connection is established
rc = zmq_send (push, "ABC", 3, 0);
assert (rc == 3);
rc = zmq_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
// Unbind the lisnening endpoint
// Unbind the listening endpoint
rc = zmq_unbind (push, ep);
assert (rc == 0);
// Let events some time
zmq_sleep (1);
// Allow unbind to settle
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Check that sending would block (there's no outbound connection).
// Check that sending would block (there's no outbound connection)
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
assert (rc == -1 && zmq_errno () == EAGAIN);
// Clean up.
// Clean up
rc = zmq_close (pull);
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
// Now the other way round.
fprintf (stderr, "disconnect endpoint test running...\n");
// Create infrastructure.
ctx = zmq_init (1);
// Create infrastructure
ctx = zmq_ctx_new ();
assert (ctx);
push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
......@@ -99,8 +93,8 @@ int main (void)
rc = zmq_disconnect (push, ep);
assert (rc == 0);
// Let events some time
zmq_sleep (1);
// Allow disconnect to settle
nanosleep (&t, NULL);
// Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
......@@ -111,7 +105,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
......
/*
Copyright (c) 2007-2013 iMatix Corporation
Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
......@@ -19,101 +20,69 @@
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <pthread.h>
#include <sys/time.h>
#include <stdio.h>
#include <string.h>
#undef NDEBUG
#include <assert.h>
extern "C"
{
void *worker(void *ctx)
{
// Worker thread connects after delay of 1 second. Then it waits
// for 1 more second, so that async connect has time to succeed.
zmq_sleep (1);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
int rc = zmq_connect (sc, "inproc://timeout_test");
assert (rc == 0);
zmq_sleep (1);
rc = zmq_close (sc);
assert (rc == 0);
return NULL;
}
}
int main (void)
{
fprintf (stderr, "test_timeo running...\n");
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a disconnected socket.
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int rc = zmq_bind (sb, "inproc://timeout_test");
void *frontend = zmq_socket (ctx, ZMQ_DEALER);
assert (frontend);
int rc = zmq_bind (frontend, "inproc://timeout_test");
assert (rc == 0);
// Check whether non-blocking recv returns immediately.
char buf [] = "12345678ABCDEFGH12345678abcdefgh";
rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT);
// Receive on disconnected socket returns immediately
char buffer [32];
rc = zmq_recv (frontend, buffer, 32, ZMQ_DONTWAIT);
assert (rc == -1);
assert (zmq_errno() == EAGAIN);
// Check whether recv timeout is honoured.
int timeout = 500;
size_t timeout_size = sizeof timeout;
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
// Check whether receive timeout is honored
int timeout = 250;
rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
void *watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == -1);
assert (zmq_errno () == EAGAIN);
unsigned long elapsed = zmq_stopwatch_stop (watch);
assert (elapsed > 440000 && elapsed < 550000);
// Check whether connection during the wait doesn't distort the timeout.
timeout = 2000;
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
pthread_t thread;
rc = pthread_create (&thread, NULL, worker, ctx);
assert (rc == 0);
watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
struct timeval before, after;
gettimeofday (&before, NULL);
rc = zmq_recv (frontend, buffer, 32, 0);
assert (rc == -1);
assert (zmq_errno () == EAGAIN);
elapsed = zmq_stopwatch_stop (watch);
assert (elapsed > 1900000 && elapsed < 2100000);
rc = pthread_join (thread, NULL);
assert (rc == 0);
gettimeofday (&after, NULL);
// Check that timeouts don't break normal message transfer.
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
long elapsed = (long)
((after.tv_sec * 1000 + after.tv_usec / 1000)
- (before.tv_sec * 1000 + before.tv_usec / 1000));
assert (elapsed > 200 && elapsed < 300);
// Check that normal message flow works as expected
void *backend = zmq_socket (ctx, ZMQ_DEALER);
assert (backend);
rc = zmq_connect (backend, "inproc://timeout_test");
assert (rc == 0);
rc = zmq_connect (sc, "inproc://timeout_test");
rc = zmq_setsockopt (backend, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_send (sc, buf, 32, 0);
assert (rc == 32);
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == 32);
// Clean-up.
rc = zmq_close (sc);
rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (frontend, buffer, 32, 0);
assert (rc == 5);
// Clean-up
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (sb);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_term (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
......@@ -24,11 +24,11 @@
#include "../include/zmq.h"
#include <string.h>
#undef NDEBUG
#include <assert.h>
inline void bounce (void *sb, void *sc)
static void
bounce (void *sb, void *sc)
{
const char *content = "12345678ABCDEFGH12345678abcdefgh";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment