Unverified Commit b8b1b8de authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3384 from sigiesec/remove-socks-tcp-code-duplication

Remove socks/tcp code duplication
parents 69a65227 e5832763
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "err.hpp" #include "err.hpp"
#include "macros.hpp" #include "macros.hpp"
#include "config.hpp" #include "config.hpp"
#include "address.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <fcntl.h> #include <fcntl.h>
...@@ -145,39 +146,28 @@ void zmq::enable_ipv4_mapping (fd_t s_) ...@@ -145,39 +146,28 @@ void zmq::enable_ipv4_mapping (fd_t s_)
int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
{ {
int rc;
struct sockaddr_storage ss; struct sockaddr_storage ss;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_WINDOWS \ zmq_socklen_t addrlen =
|| defined ZMQ_HAVE_VXWORKS get_socket_address (sockfd_, socket_end_remote, &ss);
int addrlen = static_cast<int> (sizeof ss);
#else if (addrlen == 0) {
socklen_t addrlen = sizeof ss;
#endif
rc = getpeername (sockfd_, reinterpret_cast<struct sockaddr *> (&ss),
&addrlen);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
const int last_error = WSAGetLastError (); const int last_error = WSAGetLastError ();
wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT
&& last_error != WSAEINPROGRESS && last_error != WSAEINPROGRESS
&& last_error != WSAENOTSOCK); && last_error != WSAENOTSOCK);
return 0; #elif !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
}
#else
if (rc == -1) {
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK); errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK);
#else #else
errno_assert (errno != EFAULT && errno != ENOTSOCK); errno_assert (errno != EFAULT && errno != ENOTSOCK);
#endif #endif
return 0; return 0;
} }
#endif
char host[NI_MAXHOST]; char host[NI_MAXHOST];
rc = getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host, int rc = getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen,
sizeof host, NULL, 0, NI_NUMERICHOST); host, sizeof host, NULL, 0, NI_NUMERICHOST);
if (rc != 0) if (rc != 0)
return 0; return 0;
...@@ -237,7 +227,7 @@ int zmq::set_nosigpipe (fd_t s_) ...@@ -237,7 +227,7 @@ int zmq::set_nosigpipe (fd_t s_)
return 0; return 0;
} }
void zmq::bind_to_device (fd_t s_, std::string &bound_device_) void zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
{ {
#ifdef ZMQ_HAVE_SO_BINDTODEVICE #ifdef ZMQ_HAVE_SO_BINDTODEVICE
int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE, int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
......
...@@ -56,7 +56,7 @@ void set_ip_type_of_service (fd_t s_, int iptos_); ...@@ -56,7 +56,7 @@ void set_ip_type_of_service (fd_t s_, int iptos_);
int set_nosigpipe (fd_t s_); int set_nosigpipe (fd_t s_);
// Binds the underlying socket to the given device, eg. VRF or interface // Binds the underlying socket to the given device, eg. VRF or interface
void bind_to_device (fd_t s_, std::string &bound_device_); void bind_to_device (fd_t s_, const std::string &bound_device_);
// Initialize network subsystem. May be called multiple times. Each call must be matched by a call to shutdown_network. // Initialize network subsystem. May be called multiple times. Each call must be matched by a call to shutdown_network.
bool initialize_network (); bool initialize_network ();
......
...@@ -49,12 +49,10 @@ const struct sockaddr *zmq::ip_addr_t::as_sockaddr () const ...@@ -49,12 +49,10 @@ const struct sockaddr *zmq::ip_addr_t::as_sockaddr () const
return &generic; return &generic;
} }
socklen_t zmq::ip_addr_t::sockaddr_len () const zmq::zmq_socklen_t zmq::ip_addr_t::sockaddr_len () const
{ {
if (family () == AF_INET6) { return static_cast<zmq_socklen_t> (family () == AF_INET6 ? sizeof (ipv6)
return sizeof (ipv6); : sizeof (ipv4));
}
return sizeof (ipv4);
} }
void zmq::ip_addr_t::set_port (uint16_t port_) void zmq::ip_addr_t::set_port (uint16_t port_)
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
#include <netinet/in.h> #include <netinet/in.h>
#endif #endif
#include "address.hpp"
namespace zmq namespace zmq
{ {
union ip_addr_t union ip_addr_t
...@@ -48,7 +50,7 @@ union ip_addr_t ...@@ -48,7 +50,7 @@ union ip_addr_t
uint16_t port () const; uint16_t port () const;
const struct sockaddr *as_sockaddr () const; const struct sockaddr *as_sockaddr () const;
socklen_t sockaddr_len () const; zmq_socklen_t sockaddr_len () const;
void set_port (uint16_t); void set_port (uint16_t);
......
...@@ -144,11 +144,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect () ...@@ -144,11 +144,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
// Following code should handle both Berkeley-derived socket // Following code should handle both Berkeley-derived socket
// implementations and Solaris. // implementations and Solaris.
int err = 0; int err = 0;
#if defined ZMQ_HAVE_HPUX zmq_socklen_t len = static_cast<zmq_socklen_t> (sizeof (err));
int len = sizeof (err);
#else
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len); reinterpret_cast<char *> (&err), &len);
if (rc == -1) { if (rc == -1) {
......
...@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_, ...@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
address_t *addr_, address_t *addr_,
address_t *proxy_addr_, address_t *proxy_addr_,
bool delayed_start_) : bool delayed_start_) :
own_t (io_thread_, options_), stream_connecter_base_t (
io_object_t (io_thread_), io_thread_, session_, options_, addr_, delayed_start_),
_addr (addr_),
_proxy_addr (proxy_addr_), _proxy_addr (proxy_addr_),
_status (unplugged), _status (unplugged)
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_handle_valid (false),
_delayed_start (delayed_start_),
_timer_started (false),
_session (session_),
_current_reconnect_ivl (options.reconnect_ivl)
{ {
zmq_assert (_addr);
zmq_assert (_addr->protocol == protocol_name::tcp); zmq_assert (_addr->protocol == protocol_name::tcp);
_proxy_addr->to_string (_endpoint); _proxy_addr->to_string (_endpoint);
_socket = _session->get_socket ();
} }
zmq::socks_connecter_t::~socks_connecter_t () zmq::socks_connecter_t::~socks_connecter_t ()
{ {
zmq_assert (_s == retired_fd);
LIBZMQ_DELETE (_proxy_addr); LIBZMQ_DELETE (_proxy_addr);
} }
void zmq::socks_connecter_t::process_plug ()
{
if (_delayed_start)
start_timer ();
else
initiate_connect ();
}
void zmq::socks_connecter_t::process_term (int linger_)
{
switch (_status) {
case unplugged:
break;
case waiting_for_reconnect_time:
cancel_timer (reconnect_timer_id);
break;
case waiting_for_proxy_connection:
case sending_greeting:
case waiting_for_choice:
case sending_request:
case waiting_for_response:
rm_fd (_handle);
if (_s != retired_fd)
close ();
break;
}
own_t::process_term (linger_);
}
void zmq::socks_connecter_t::in_event () void zmq::socks_connecter_t::in_event ()
{ {
zmq_assert (_status != unplugged && _status != waiting_for_reconnect_time); zmq_assert (_status != unplugged);
if (_status == waiting_for_choice) { if (_status == waiting_for_choice) {
int rc = _choice_decoder.input (_s); int rc = _choice_decoder.input (_s);
...@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event () ...@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
std::string hostname = ""; std::string hostname;
uint16_t port = 0; uint16_t port = 0;
if (parse_address (_addr->address, hostname, port) == -1) if (parse_address (_addr->address, hostname, port) == -1)
error (); error ();
...@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event () ...@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
const endpoint_uri_pair_t endpoint_pair = endpoint_uri_pair_t ( rm_handle ();
get_socket_name<tcp_address_t> (_s, socket_end_local), create_engine (
_endpoint, endpoint_type_connect); _s, get_socket_name<tcp_address_t> (_s, socket_end_local));
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (_s, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
_socket->event_connected (endpoint_pair, _s);
rm_fd (_handle);
_s = -1; _s = -1;
_status = unplugged; _status = unplugged;
// Shut the connecter down.
terminate ();
} }
} }
} else } else
...@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event () ...@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event ()
} }
} }
void zmq::socks_connecter_t::initiate_connect () void zmq::socks_connecter_t::start_connecting ()
{ {
zmq_assert (_status == unplugged);
// Open the connecting socket. // Open the connecting socket.
const int rc = connect_to_proxy (); const int rc = connect_to_proxy ();
...@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect () ...@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect ()
else { else {
if (_s != retired_fd) if (_s != retired_fd)
close (); close ();
start_timer (); add_reconnect_timer ();
} }
} }
...@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response ( ...@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response (
return response_.response_code == 0 ? 0 : -1; return response_.response_code == 0 ? 0 : -1;
} }
void zmq::socks_connecter_t::timer_event (int id_)
{
zmq_assert (_status == waiting_for_reconnect_time);
zmq_assert (id_ == reconnect_timer_id);
initiate_connect ();
}
void zmq::socks_connecter_t::error () void zmq::socks_connecter_t::error ()
{ {
rm_fd (_handle); rm_fd (_handle);
...@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error () ...@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error ()
_choice_decoder.reset (); _choice_decoder.reset ();
_request_encoder.reset (); _request_encoder.reset ();
_response_decoder.reset (); _response_decoder.reset ();
start_timer (); add_reconnect_timer ();
}
void zmq::socks_connecter_t::start_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_status = waiting_for_reconnect_time;
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
}
}
int zmq::socks_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
} }
int zmq::socks_connecter_t::connect_to_proxy () int zmq::socks_connecter_t::connect_to_proxy ()
...@@ -303,49 +215,26 @@ int zmq::socks_connecter_t::connect_to_proxy () ...@@ -303,49 +215,26 @@ int zmq::socks_connecter_t::connect_to_proxy ()
zmq_assert (_s == retired_fd); zmq_assert (_s == retired_fd);
// Resolve the address // Resolve the address
LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr); if (_addr->resolved.tcp_addr != NULL) {
_proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); LIBZMQ_DELETE (_addr->resolved.tcp_addr);
alloc_assert (_proxy_addr->resolved.tcp_addr);
int rc = _proxy_addr->resolved.tcp_addr->resolve (
_proxy_addr->address.c_str (), false, options.ipv6);
if (rc != 0) {
LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
return -1;
} }
zmq_assert (_proxy_addr->resolved.tcp_addr != NULL);
const tcp_address_t *tcp_addr = _proxy_addr->resolved.tcp_addr;
// Create the socket. _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
_s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); alloc_assert (_addr->resolved.tcp_addr);
if (_s == retired_fd) // Automatic fallback to ipv4 is disabled here since this was the existing
// behaviour, however I don't see a real reason for this. Maybe this can
// be changed to true (and then the parameter can be removed entirely).
_s = tcp_open_socket (_addr->address.c_str (), options, false,
_addr->resolved.tcp_addr);
if (_s == retired_fd) {
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
return -1; return -1;
}
zmq_assert (_addr->resolved.tcp_addr != NULL);
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default. const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
// Switch it on in such cases.
if (tcp_addr->family () == AF_INET6)
enable_ipv4_mapping (_s);
// Set the IP Type-Of-Service priority for this socket
if (options.tos != 0)
set_ip_type_of_service (_s, options.tos);
// Bind the socket to a device if applicable
if (!options.bound_device.empty ())
bind_to_device (_s, options.bound_device);
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket (_s);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf >= 0)
set_tcp_send_buffer (_s, options.sndbuf);
if (options.rcvbuf >= 0)
set_tcp_receive_buffer (_s, options.rcvbuf);
// Set the IP Type-Of-Service for the underlying socket int rc;
if (options.tos != 0)
set_ip_type_of_service (_s, options.tos);
// Set a source address for conversations // Set a source address for conversations
if (tcp_addr->has_src_addr ()) { if (tcp_addr->has_src_addr ()) {
...@@ -439,21 +328,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () ...@@ -439,21 +328,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
return 0; return 0;
} }
void zmq::socks_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
int zmq::socks_connecter_t::parse_address (const std::string &address_, int zmq::socks_connecter_t::parse_address (const std::string &address_,
std::string &hostname_, std::string &hostname_,
uint16_t &port_) uint16_t &port_)
......
...@@ -31,8 +31,7 @@ ...@@ -31,8 +31,7 @@
#define __SOCKS_CONNECTER_HPP_INCLUDED__ #define __SOCKS_CONNECTER_HPP_INCLUDED__
#include "fd.hpp" #include "fd.hpp"
#include "io_object.hpp" #include "stream_connecter_base.hpp"
#include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "socks.hpp" #include "socks.hpp"
...@@ -42,8 +41,7 @@ class io_thread_t; ...@@ -42,8 +41,7 @@ class io_thread_t;
class session_base_t; class session_base_t;
struct address_t; struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t class socks_connecter_t : public stream_connecter_base_t
class socks_connecter_t : public own_t, public io_object_t
{ {
public: public:
// If 'delayed_start' is true connecter first waits for a while, // If 'delayed_start' is true connecter first waits for a while,
...@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t
waiting_for_response waiting_for_response
}; };
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Method ID // Method ID
enum enum
{ {
socks_no_auth_required = 0 socks_no_auth_required = 0
}; };
// Handlers for incoming commands.
virtual void process_plug ();
virtual void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
virtual void in_event (); virtual void in_event ();
virtual void out_event (); virtual void out_event ();
virtual void timer_event (int id_);
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void initiate_connect (); void start_connecting ();
int process_server_response (const socks_choice_t &response_); int process_server_response (const socks_choice_t &response_);
int process_server_response (const socks_response_t &response_); int process_server_response (const socks_response_t &response_);
...@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t
void error (); void error ();
// Internal function to start reconnect timer
void start_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error, // Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with // 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
int open (); int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
zmq::fd_t check_proxy_connection (); zmq::fd_t check_proxy_connection ();
...@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t
socks_request_encoder_t _request_encoder; socks_request_encoder_t _request_encoder;
socks_response_decoder_t _response_decoder; socks_response_decoder_t _response_decoder;
// Address to connect to. Owned by session_base_t.
address_t *_addr;
// SOCKS address; owned by this connecter. // SOCKS address; owned by this connecter.
address_t *_proxy_addr; address_t *_proxy_addr;
int _status; int _status;
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket.
handle_t _handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool _handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool _delayed_start;
// True iff a timer has been started.
bool _timer_started;
// Reference to the session we belong to.
zmq::session_base_t *_session;
// Current reconnect ivl, updated for backoff strategy
int _current_reconnect_ivl;
// String representation of endpoint to connect to
std::string _endpoint;
// Socket
zmq::socket_base_t *_socket;
socks_connecter_t (const socks_connecter_t &); socks_connecter_t (const socks_connecter_t &);
const socks_connecter_t &operator= (const socks_connecter_t &); const socks_connecter_t &operator= (const socks_connecter_t &);
}; };
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "ip.hpp" #include "ip.hpp"
#include "tcp.hpp" #include "tcp.hpp"
#include "err.hpp" #include "err.hpp"
#include "options.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <fcntl.h> #include <fcntl.h>
...@@ -381,3 +382,60 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) ...@@ -381,3 +382,60 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
LIBZMQ_UNUSED (socket_); LIBZMQ_UNUSED (socket_);
#endif #endif
} }
zmq::fd_t zmq::tcp_open_socket (const char *address_,
const zmq::options_t &options_,
bool fallback_to_ipv4_,
zmq::tcp_address_t *out_tcp_addr_)
{
// Convert the textual address into address structure.
int rc = out_tcp_addr_->resolve (address_, true, options_.ipv6);
if (rc != 0)
return retired_fd;
// Create the socket.
fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP);
// IPv6 address family not supported, try automatic downgrade to IPv4.
if (s == retired_fd && fallback_to_ipv4_
&& out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT
&& options_.ipv6) {
rc = out_tcp_addr_->resolve (address_, false, false);
if (rc != 0) {
return retired_fd;
}
s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
if (s == retired_fd) {
return retired_fd;
}
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
// Switch it on in such cases.
if (out_tcp_addr_->family () == AF_INET6)
enable_ipv4_mapping (s);
// Set the IP Type-Of-Service priority for this socket
if (options_.tos != 0)
set_ip_type_of_service (s, options_.tos);
// Set the socket to loopback fastpath if configured.
if (options_.loopback_fastpath)
tcp_tune_loopback_fast_path (s);
// Bind the socket to a device if applicable
if (!options_.bound_device.empty ())
bind_to_device (s, options_.bound_device);
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket (s);
// Set the socket buffer limits for the underlying socket.
if (options_.sndbuf >= 0)
set_tcp_send_buffer (s, options_.sndbuf);
if (options_.rcvbuf >= 0)
set_tcp_receive_buffer (s, options_.rcvbuf);
return s;
}
...@@ -34,6 +34,9 @@ ...@@ -34,6 +34,9 @@
namespace zmq namespace zmq
{ {
class tcp_address_t;
struct options_t;
// Tunes the supplied TCP socket for the best latency. // Tunes the supplied TCP socket for the best latency.
int tune_tcp_socket (fd_t s_); int tune_tcp_socket (fd_t s_);
...@@ -68,6 +71,16 @@ int tcp_read (fd_t s_, void *data_, size_t size_); ...@@ -68,6 +71,16 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
void tcp_assert_tuning_error (fd_t s_, int rc_); void tcp_assert_tuning_error (fd_t s_, int rc_);
void tcp_tune_loopback_fast_path (const fd_t socket_); void tcp_tune_loopback_fast_path (const fd_t socket_);
// Resolves the given address_ string, opens a socket and sets socket options
// according to the passed options_. On success, returns the socket
// descriptor and assigns the resolved address to out_tcp_addr_. In case of
// an error, retired_fd is returned, and the value of out_tcp_addr_ is undefined.
// errno is set to an error code describing the cause of the error.
fd_t tcp_open_socket (const char *address_,
const options_t &options_,
bool fallback_to_ipv4_,
tcp_address_t *out_tcp_addr_);
} }
#endif #endif
...@@ -174,63 +174,17 @@ int zmq::tcp_connecter_t::open () ...@@ -174,63 +174,17 @@ int zmq::tcp_connecter_t::open ()
_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (_addr->resolved.tcp_addr); alloc_assert (_addr->resolved.tcp_addr);
int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false, _s = tcp_open_socket (_addr->address.c_str (), options, true,
options.ipv6); _addr->resolved.tcp_addr);
if (rc != 0) { if (_s == retired_fd) {
LIBZMQ_DELETE (_addr->resolved.tcp_addr); LIBZMQ_DELETE (_addr->resolved.tcp_addr);
return -1; return -1;
} }
zmq_assert (_addr->resolved.tcp_addr != NULL); zmq_assert (_addr->resolved.tcp_addr != NULL);
const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
// Create the socket.
_s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
// IPv6 address family not supported, try automatic downgrade to IPv4.
if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6
&& errno == EAFNOSUPPORT && options.ipv6) {
rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
false);
if (rc != 0) {
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
return -1;
}
_s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
if (_s == retired_fd) {
return -1;
}
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
// Switch it on in such cases.
if (tcp_addr->family () == AF_INET6)
enable_ipv4_mapping (_s);
// Set the IP Type-Of-Service priority for this socket
if (options.tos != 0)
set_ip_type_of_service (_s, options.tos);
// Bind the socket to a device if applicable const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
if (!options.bound_device.empty ())
bind_to_device (_s, options.bound_device);
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket (_s);
// Set the socket to loopback fastpath if configured.
if (options.loopback_fastpath)
tcp_tune_loopback_fast_path (_s);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf >= 0)
set_tcp_send_buffer (_s, options.sndbuf);
if (options.rcvbuf >= 0)
set_tcp_receive_buffer (_s, options.rcvbuf);
// Set the IP Type-Of-Service for the underlying socket int rc;
if (options.tos != 0)
set_ip_type_of_service (_s, options.tos);
// Set a source address for conversations // Set a source address for conversations
if (tcp_addr->has_src_addr ()) { if (tcp_addr->has_src_addr ()) {
......
...@@ -103,54 +103,23 @@ zmq::tcp_listener_t::get_socket_name (zmq::fd_t fd_, ...@@ -103,54 +103,23 @@ zmq::tcp_listener_t::get_socket_name (zmq::fd_t fd_,
int zmq::tcp_listener_t::create_socket (const char *addr_) int zmq::tcp_listener_t::create_socket (const char *addr_)
{ {
// Convert the textual address into address structure. _s = tcp_open_socket (addr_, options, true, &_address);
int rc = _address.resolve (addr_, true, options.ipv6);
if (rc != 0)
return -1;
// Create a listening socket.
_s = open_socket (_address.family (), SOCK_STREAM, IPPROTO_TCP);
// IPv6 address family not supported, try automatic downgrade to IPv4.
if (_s == zmq::retired_fd && _address.family () == AF_INET6
&& errno == EAFNOSUPPORT && options.ipv6) {
rc = _address.resolve (addr_, true, false);
if (rc != 0)
return rc;
_s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
if (_s == retired_fd) { if (_s == retired_fd) {
return -1; return -1;
} }
make_socket_noninheritable (_s);
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
// Switch it on in such cases.
if (_address.family () == AF_INET6)
enable_ipv4_mapping (_s);
// Set the IP Type-Of-Service for the underlying socket
if (options.tos != 0)
set_ip_type_of_service (_s, options.tos);
// Set the socket to loopback fastpath if configured. // TODO why is this only done for the listener?
if (options.loopback_fastpath) make_socket_noninheritable (_s);
tcp_tune_loopback_fast_path (_s);
// Bind the socket to a device if applicable
if (!options.bound_device.empty ())
bind_to_device (_s, options.bound_device);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf >= 0)
set_tcp_send_buffer (_s, options.sndbuf);
if (options.rcvbuf >= 0)
set_tcp_receive_buffer (_s, options.rcvbuf);
// Allow reusing of the address. // Allow reusing of the address.
int flag = 1; int flag = 1;
int rc;
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
// TODO this was changed for Windows from SO_REUSEADDRE to
// SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c,
// so the comment above is no longer correct; also, now the settings are
// different between listener and connecter with a src address.
// is this intentional?
rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
reinterpret_cast<const char *> (&flag), sizeof (int)); reinterpret_cast<const char *> (&flag), sizeof (int));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
......
...@@ -198,7 +198,8 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) ...@@ -198,7 +198,8 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
} else { } else {
/// XXX fixme ? /// XXX fixme ?
_out_address = reinterpret_cast<sockaddr *> (&_raw_address); _out_address = reinterpret_cast<sockaddr *> (&_raw_address);
_out_address_len = sizeof (sockaddr_in); _out_address_len =
static_cast<zmq_socklen_t> (sizeof (sockaddr_in));
} }
set_pollout (_handle); set_pollout (_handle);
...@@ -319,19 +320,26 @@ void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_) ...@@ -319,19 +320,26 @@ void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_)
const char *const name = inet_ntoa (addr_->sin_addr); const char *const name = inet_ntoa (addr_->sin_addr);
char port[6]; char port[6];
const int port_len =
sprintf (port, "%d", static_cast<int> (ntohs (addr_->sin_port))); sprintf (port, "%d", static_cast<int> (ntohs (addr_->sin_port)));
zmq_assert (port_len > 0);
const int size = static_cast<int> (strlen (name)) const size_t name_len = strlen (name);
+ static_cast<int> (strlen (port)) + 1 const int size = static_cast<int> (name_len) + 1 /* colon */
+ 1; // Colon + NULL + port_len + 1; // terminating NUL
const int rc = msg_->init_size (size); const int rc = msg_->init_size (size);
errno_assert (rc == 0); errno_assert (rc == 0);
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
char *address = static_cast<char *> (msg_->data ());
strcpy (address, name); // use memcpy instead of strcpy/strcat, since this is more efficient when
strcat (address, ":"); // we already know the lengths, which we calculated above
strcat (address, port); char *address = static_cast<char *> (msg_->data ());
memcpy (address, name, name_len);
address += name_len;
*address++ = ':';
memcpy (address, port, static_cast<size_t> (port_len));
address += port_len;
*address = 0;
} }
int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_) int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
...@@ -430,11 +438,11 @@ void zmq::udp_engine_t::out_event () ...@@ -430,11 +438,11 @@ void zmq::udp_engine_t::out_event ()
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address, rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
static_cast<int> (_out_address_len)); _out_address_len);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS #elif defined ZMQ_HAVE_VXWORKS
rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0, rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
(sockaddr *) _out_address, (int) _out_address_len); (sockaddr *) _out_address, _out_address_len);
errno_assert (rc != -1); errno_assert (rc != -1);
#else #else
rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len); rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
...@@ -465,37 +473,23 @@ void zmq::udp_engine_t::restart_output () ...@@ -465,37 +473,23 @@ void zmq::udp_engine_t::restart_output ()
void zmq::udp_engine_t::in_event () void zmq::udp_engine_t::in_event ()
{ {
sockaddr_storage in_address; sockaddr_storage in_address;
socklen_t in_addrlen = sizeof (sockaddr_storage); zmq_socklen_t in_addrlen =
#ifdef ZMQ_HAVE_WINDOWS static_cast<zmq_socklen_t> (sizeof (sockaddr_storage));
int nbytes =
const int nbytes =
recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0, recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
reinterpret_cast<sockaddr *> (&in_address), &in_addrlen); reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
const int last_error = WSAGetLastError (); #ifdef ZMQ_HAVE_WINDOWS
if (nbytes == SOCKET_ERROR) { if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError ();
wsa_assert (last_error == WSAENETDOWN || last_error == WSAENETRESET wsa_assert (last_error == WSAENETDOWN || last_error == WSAENETRESET
|| last_error == WSAEWOULDBLOCK); || last_error == WSAEWOULDBLOCK);
return; return;
} }
#elif defined ZMQ_HAVE_VXWORKS
int nbytes = recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
(sockaddr *) &in_address, (int *) &in_addrlen);
if (nbytes == -1) {
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
&& errno != ENOTSOCK);
return;
}
#else #else
int nbytes =
recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
if (nbytes == -1) { if (nbytes == -1) {
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
&& errno != ENOTSOCK); && errno != ENOTSOCK);
#else
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
&& errno != ENOTSOCK);
#endif
return; return;
} }
#endif #endif
......
...@@ -62,7 +62,7 @@ class udp_engine_t : public io_object_t, public i_engine ...@@ -62,7 +62,7 @@ class udp_engine_t : public io_object_t, public i_engine
sockaddr_in _raw_address; sockaddr_in _raw_address;
const struct sockaddr *_out_address; const struct sockaddr *_out_address;
socklen_t _out_address_len; zmq_socklen_t _out_address_len;
char _out_buffer[MAX_UDP_MSG]; char _out_buffer[MAX_UDP_MSG];
char _in_buffer[MAX_UDP_MSG]; char _in_buffer[MAX_UDP_MSG];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment