Commit 25bf30be authored by Luca Boccassi's avatar Luca Boccassi

Problem: 2 connects with same sourceip:port to different destip:port fail

Solution: during a connect with a TCP endpoint if a source address is
passed set the SO_REUSEADDR flag on the socket before the bind system
call.
Add unit test to cover this case for both IPv4 and IPv6.
parent 669ff41d
......@@ -314,6 +314,18 @@ int zmq::tcp_connecter_t::open ()
// Set a source address for conversations
if (tcp_addr->has_src_addr ()) {
// Allow reusing of the address, to connect to different servers
// using the same source port on the client.
int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char*) &flag,
sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
#else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
if (rc == -1)
return -1;
......
......@@ -131,6 +131,73 @@ void test_multi_connect_ipv4 (void)
assert (rc == 0);
}
void test_multi_connect_ipv4_same_port (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int rc = zmq_bind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_bind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
assert (sc0);
rc = zmq_connect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
assert (rc == 0);
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
assert (sc1);
rc = zmq_connect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
assert (rc == 0);
bounce (sb0, sc0);
bounce (sb1, sc0);
bounce (sb0, sc1);
bounce (sb1, sc1);
bounce (sb0, sc0);
bounce (sb1, sc0);
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
rc = zmq_close (sc0);
assert (rc == 0);
rc = zmq_close (sc1);
assert (rc == 0);
rc = zmq_close (sb0);
assert (rc == 0);
rc = zmq_close (sb1);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_single_connect_ipv6 (void)
{
void *ctx = zmq_ctx_new ();
......@@ -257,6 +324,87 @@ void test_multi_connect_ipv6 (void)
assert (rc == 0);
}
void test_multi_connect_ipv6_same_port (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
if (!is_ipv6_available ()) {
zmq_ctx_term (ctx);
return;
}
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int ipv6 = 1;
int rc = zmq_setsockopt (sb0, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb0, "tcp://[::1]:5560");
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_setsockopt (sb1, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb1, "tcp://[::1]:5561");
assert (rc == 0);
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
assert (sc0);
rc = zmq_setsockopt (sc0, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://[::1]:5564;[::1]:5560");
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://[::1]:5565;[::1]:5561");
assert (rc == 0);
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
assert (sc1);
rc = zmq_setsockopt (sc1, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://[::1]:5565;[::1]:5560");
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://[::1]:5564;[::1]:5561");
assert (rc == 0);
bounce (sb0, sc0);
bounce (sb1, sc0);
bounce (sb0, sc1);
bounce (sb1, sc1);
bounce (sb0, sc0);
bounce (sb1, sc0);
rc = zmq_disconnect (sc1, "tcp://[::1]:5565;[::1]:5560");
assert (rc == 0);
rc = zmq_disconnect (sc1, "tcp://[::1]:5564;[::1]:5561");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://[::1]:5564;[::1]:5560");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://[::1]:5565;[::1]:5561");
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://[::1]:5560");
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://[::1]:5561");
assert (rc == 0);
rc = zmq_close (sc0);
assert (rc == 0);
rc = zmq_close (sc1);
assert (rc == 0);
rc = zmq_close (sb0);
assert (rc == 0);
rc = zmq_close (sb1);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void)
{
setup_test_environment ();
......@@ -265,9 +413,13 @@ int main (void)
test_multi_connect_ipv4 ();
test_multi_connect_ipv4_same_port ();
test_single_connect_ipv6 ();
test_multi_connect_ipv6 ();
test_multi_connect_ipv6_same_port ();
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment