Commit 2c2ea827 authored by Simon Giesecke's avatar Simon Giesecke Committed by Simon Giesecke

Problem: duplicated code, redundant member handle_valid, asymmetry between

  tcp_connecter and tcp_listener

Solution: remove duplication and redundant member, align handling of
handle in tcp_connecter and tcp_listener
parent b77d7610
...@@ -67,7 +67,6 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, ...@@ -67,7 +67,6 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
addr (addr_), addr (addr_),
s (retired_fd), s (retired_fd),
handle ((handle_t) NULL), handle ((handle_t) NULL),
handle_valid (false),
delayed_start (delayed_start_), delayed_start (delayed_start_),
connect_timer_started (false), connect_timer_started (false),
reconnect_timer_started (false), reconnect_timer_started (false),
...@@ -84,7 +83,7 @@ zmq::tcp_connecter_t::~tcp_connecter_t () ...@@ -84,7 +83,7 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
{ {
zmq_assert (!connect_timer_started); zmq_assert (!connect_timer_started);
zmq_assert (!reconnect_timer_started); zmq_assert (!reconnect_timer_started);
zmq_assert (!handle_valid); zmq_assert (!handle);
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
} }
...@@ -108,9 +107,8 @@ void zmq::tcp_connecter_t::process_term (int linger_) ...@@ -108,9 +107,8 @@ void zmq::tcp_connecter_t::process_term (int linger_)
reconnect_timer_started = false; reconnect_timer_started = false;
} }
if (handle_valid) { if (handle) {
rm_fd (handle); rm_handle ();
handle_valid = false;
} }
if (s != retired_fd) if (s != retired_fd)
...@@ -134,25 +132,12 @@ void zmq::tcp_connecter_t::out_event () ...@@ -134,25 +132,12 @@ void zmq::tcp_connecter_t::out_event ()
connect_timer_started = false; connect_timer_started = false;
} }
rm_fd (handle); rm_handle ();
handle_valid = false;
const fd_t fd = connect (); const fd_t fd = connect ();
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd || !tune_socket (fd)) {
close ();
add_reconnect_timer ();
return;
}
int rc = tune_tcp_socket (fd);
rc = rc
| tune_tcp_keepalives (
fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
close (); close ();
add_reconnect_timer (); add_reconnect_timer ();
return; return;
...@@ -172,15 +157,18 @@ void zmq::tcp_connecter_t::out_event () ...@@ -172,15 +157,18 @@ void zmq::tcp_connecter_t::out_event ()
socket->event_connected (endpoint, (int) fd); socket->event_connected (endpoint, (int) fd);
} }
void zmq::tcp_connecter_t::rm_handle ()
{
rm_fd (handle);
handle = (handle_t) NULL;
}
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
if (id_ == connect_timer_id) { if (id_ == connect_timer_id) {
connect_timer_started = false; connect_timer_started = false;
rm_handle ();
rm_fd (handle);
handle_valid = false;
close (); close ();
add_reconnect_timer (); add_reconnect_timer ();
} else if (id_ == reconnect_timer_id) { } else if (id_ == reconnect_timer_id) {
...@@ -197,14 +185,12 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -197,14 +185,12 @@ void zmq::tcp_connecter_t::start_connecting ()
// Connect may succeed in synchronous manner. // Connect may succeed in synchronous manner.
if (rc == 0) { if (rc == 0) {
handle = add_fd (s); handle = add_fd (s);
handle_valid = true;
out_event (); out_event ();
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) { else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s); handle = add_fd (s);
handle_valid = true;
set_pollout (handle); set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno ()); socket->event_connect_delayed (endpoint, zmq_errno ());
...@@ -346,11 +332,12 @@ int zmq::tcp_connecter_t::open () ...@@ -346,11 +332,12 @@ int zmq::tcp_connecter_t::open ()
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
// Connect was successful immediately. // Connect was successful immediately.
if (rc == 0) if (rc == 0) {
return 0; return 0;
}
// Translate error codes indicating asynchronous connect has been // Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS. // launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError (); const int last_error = WSAGetLastError ();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
...@@ -406,6 +393,16 @@ zmq::fd_t zmq::tcp_connecter_t::connect () ...@@ -406,6 +393,16 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
return result; return result;
} }
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd)
{
const int rc = tune_tcp_socket (fd)
| tune_tcp_keepalives (
fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
| tune_tcp_maxrt (fd, options.tcp_maxrt);
return rc == 0;
}
void zmq::tcp_connecter_t::close () void zmq::tcp_connecter_t::close ()
{ {
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
......
...@@ -70,6 +70,9 @@ class tcp_connecter_t : public own_t, public io_object_t ...@@ -70,6 +70,9 @@ class tcp_connecter_t : public own_t, public io_object_t
void out_event (); void out_event ();
void timer_event (int id_); void timer_event (int id_);
// Removes the handle from the poller.
void rm_handle ();
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
...@@ -96,19 +99,19 @@ class tcp_connecter_t : public own_t, public io_object_t ...@@ -96,19 +99,19 @@ class tcp_connecter_t : public own_t, public io_object_t
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();
// Tunes a connected socket.
bool tune_socket (fd_t fd);
// Address to connect to. Owned by session_base_t. // Address to connect to. Owned by session_base_t.
address_t *addr; address_t *addr;
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
// Handle corresponding to the listening socket. // Handle corresponding to the listening socket, if file descriptor is
// registered with the poller, or NULL.
handle_t handle; handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect. // If true, connecter is waiting a while before trying to connect.
const bool delayed_start; const bool delayed_start;
......
...@@ -71,6 +71,7 @@ zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, ...@@ -71,6 +71,7 @@ zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
zmq::tcp_listener_t::~tcp_listener_t () zmq::tcp_listener_t::~tcp_listener_t ()
{ {
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
zmq_assert (!handle);
} }
void zmq::tcp_listener_t::process_plug () void zmq::tcp_listener_t::process_plug ()
...@@ -83,6 +84,7 @@ void zmq::tcp_listener_t::process_plug () ...@@ -83,6 +84,7 @@ void zmq::tcp_listener_t::process_plug ()
void zmq::tcp_listener_t::process_term (int linger_) void zmq::tcp_listener_t::process_term (int linger_)
{ {
rm_fd (handle); rm_fd (handle);
handle = (handle_t) NULL;
close (); close ();
own_t::process_term (linger_); own_t::process_term (linger_);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment