Commit 668b2c4d authored by Ian Barber's avatar Ian Barber

Merge pull request #1083 from hurtonm/master

Code cleanup
parents 8c616290 706eb4da
...@@ -59,12 +59,12 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, ...@@ -59,12 +59,12 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
delayed_start (delayed_start_), delayed_start (delayed_start_),
timer_started (false), timer_started (false),
session (session_), session (session_),
current_reconnect_ivl(options.reconnect_ivl) current_reconnect_ivl (options.reconnect_ivl)
{ {
zmq_assert (addr); zmq_assert (addr);
zmq_assert (addr->protocol == "tcp"); zmq_assert (addr->protocol == "tcp");
addr->to_string (endpoint); addr->to_string (endpoint);
socket = session-> get_socket(); socket = session->get_socket ();
} }
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
...@@ -110,14 +110,14 @@ void zmq::tcp_connecter_t::in_event () ...@@ -110,14 +110,14 @@ void zmq::tcp_connecter_t::in_event ()
void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::out_event ()
{ {
fd_t fd = connect ();
rm_fd (handle); rm_fd (handle);
handle_valid = false; handle_valid = false;
const fd_t fd = connect ();
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
close (); close ();
add_reconnect_timer(); add_reconnect_timer ();
return; return;
} }
...@@ -125,7 +125,7 @@ void zmq::tcp_connecter_t::out_event () ...@@ -125,7 +125,7 @@ void zmq::tcp_connecter_t::out_event ()
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// remember our fd for ZMQ_SRCFD in messages // remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd); socket->set_fd (fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
...@@ -151,7 +151,7 @@ void zmq::tcp_connecter_t::timer_event (int id_) ...@@ -151,7 +151,7 @@ void zmq::tcp_connecter_t::timer_event (int id_)
void zmq::tcp_connecter_t::start_connecting () void zmq::tcp_connecter_t::start_connecting ()
{ {
// Open the connecting socket. // Open the connecting socket.
int rc = open (); const int rc = open ();
// Connect may succeed in synchronous manner. // Connect may succeed in synchronous manner.
if (rc == 0) { if (rc == 0) {
...@@ -177,32 +177,28 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -177,32 +177,28 @@ void zmq::tcp_connecter_t::start_connecting ()
} }
} }
void zmq::tcp_connecter_t::add_reconnect_timer() void zmq::tcp_connecter_t::add_reconnect_timer ()
{ {
int rc_ivl = get_new_reconnect_ivl(); const int interval = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id); add_timer (interval, reconnect_timer_id);
socket->event_connect_retried (endpoint, rc_ivl); socket->event_connect_retried (endpoint, interval);
timer_started = true; timer_started = true;
} }
int zmq::tcp_connecter_t::get_new_reconnect_ivl () int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{ {
// The new interval is the current interval + random value. // The new interval is the current interval + random value.
int this_interval = current_reconnect_ivl + const int interval = current_reconnect_ivl +
(generate_random () % options.reconnect_ivl); generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect // Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval. // interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0 && if (options.reconnect_ivl_max > 0 &&
options.reconnect_ivl_max > options.reconnect_ivl) { options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval // Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2; current_reconnect_ivl =
if(current_reconnect_ivl >= options.reconnect_ivl_max) { std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
current_reconnect_ivl = options.reconnect_ivl_max; return interval;
}
}
return this_interval;
} }
int zmq::tcp_connecter_t::open () int zmq::tcp_connecter_t::open ()
...@@ -214,7 +210,6 @@ int zmq::tcp_connecter_t::open () ...@@ -214,7 +210,6 @@ int zmq::tcp_connecter_t::open ()
delete addr->resolved.tcp_addr; delete addr->resolved.tcp_addr;
addr->resolved.tcp_addr = NULL; addr->resolved.tcp_addr = NULL;
} }
zmq_assert (addr->resolved.tcp_addr == NULL);
addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (addr->resolved.tcp_addr); alloc_assert (addr->resolved.tcp_addr);
...@@ -226,9 +221,10 @@ int zmq::tcp_connecter_t::open () ...@@ -226,9 +221,10 @@ int zmq::tcp_connecter_t::open ()
return -1; return -1;
} }
zmq_assert (addr->resolved.tcp_addr != NULL); zmq_assert (addr->resolved.tcp_addr != NULL);
tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
// Create the socket. // Create the socket.
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
errno = wsa_error_to_errno (WSAGetLastError ()); errno = wsa_error_to_errno (WSAGetLastError ());
...@@ -241,7 +237,7 @@ int zmq::tcp_connecter_t::open () ...@@ -241,7 +237,7 @@ int zmq::tcp_connecter_t::open ()
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default. // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
// Switch it on in such cases. // Switch it on in such cases.
if (addr->resolved.tcp_addr->family () == AF_INET6) if (tcp_addr->family () == AF_INET6)
enable_ipv4_mapping (s); enable_ipv4_mapping (s);
// Set the IP Type-Of-Service priority for this socket // Set the IP Type-Of-Service priority for this socket
...@@ -262,18 +258,14 @@ int zmq::tcp_connecter_t::open () ...@@ -262,18 +258,14 @@ int zmq::tcp_connecter_t::open ()
set_ip_type_of_service (s, options.tos); set_ip_type_of_service (s, options.tos);
// Set a source address for conversations // Set a source address for conversations
if (addr->resolved.tcp_addr->has_src_addr ()) { if (tcp_addr->has_src_addr ()) {
rc = ::bind (s, addr->resolved.tcp_addr->src_addr (), addr->resolved.tcp_addr->src_addrlen ()); rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
if (rc == -1)
if (rc == -1) {
return -1; return -1;
} }
}
// Connect to the remote peer. // Connect to the remote peer.
rc = ::connect ( rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
s, addr->resolved.tcp_addr->addr (),
addr->resolved.tcp_addr->addrlen ());
// Connect was successfull immediately. // Connect was successfull immediately.
if (rc == 0) if (rc == 0)
...@@ -298,33 +290,31 @@ zmq::fd_t zmq::tcp_connecter_t::connect () ...@@ -298,33 +290,31 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
{ {
// Async connect has finished. Check whether an error occurred // Async connect has finished. Check whether an error occurred
int err = 0; int err = 0;
#if defined ZMQ_HAVE_HPUX #ifdef ZMQ_HAVE_HPUX
int len = sizeof (err); int len = sizeof err;
#else #else
socklen_t len = sizeof (err); socklen_t len = sizeof err;
#endif #endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
// Assert if the error was caused by 0MQ bug. // Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert. // Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0); zmq_assert (rc == 0);
if (err != 0) { if (err != 0) {
if (err == WSAECONNREFUSED || wsa_assert (err == WSAECONNREFUSED
err == WSAETIMEDOUT || || err == WSAETIMEDOUT
err == WSAECONNABORTED || || err == WSAECONNABORTED
err == WSAEHOSTUNREACH || || err == WSAEHOSTUNREACH
err == WSAENETUNREACH || || err == WSAENETUNREACH
err == WSAENETDOWN || || err == WSAENETDOWN
err == WSAEACCES || || err == WSAEACCES
err == WSAEINVAL || || err == WSAEINVAL
err == WSAEADDRINUSE) || err == WSAEADDRINUSE);
return retired_fd; return retired_fd;
wsa_assert_no (err);
} }
#else #else
// Following code should handle both Berkeley-derived socket // Following code should handle both Berkeley-derived socket
// implementations and Solaris. // implementations and Solaris.
if (rc == -1) if (rc == -1)
...@@ -344,7 +334,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect () ...@@ -344,7 +334,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
#endif #endif
// Return the newly connected socket. // Return the newly connected socket.
fd_t result = s; const fd_t result = s;
s = retired_fd; s = retired_fd;
return result; return result;
} }
...@@ -353,10 +343,10 @@ void zmq::tcp_connecter_t::close () ...@@ -353,10 +343,10 @@ void zmq::tcp_connecter_t::close ()
{ {
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s); const int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
int rc = ::close (s); const int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint, s); socket->event_closed (endpoint, s);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment