Commit 12c4b55a authored by Simon Giesecke's avatar Simon Giesecke

Problem: socks_connecter_t duplicates code with stream_connecter_base_t

Solution: let socks_connecter_t derive from stream_connecter_base_t and remove duplicate code
parent 14da2ab6
...@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_, ...@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
address_t *addr_, address_t *addr_,
address_t *proxy_addr_, address_t *proxy_addr_,
bool delayed_start_) : bool delayed_start_) :
own_t (io_thread_, options_), stream_connecter_base_t (
io_object_t (io_thread_), io_thread_, session_, options_, addr_, delayed_start_),
_addr (addr_),
_proxy_addr (proxy_addr_), _proxy_addr (proxy_addr_),
_status (unplugged), _status (unplugged)
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_handle_valid (false),
_delayed_start (delayed_start_),
_timer_started (false),
_session (session_),
_current_reconnect_ivl (options.reconnect_ivl)
{ {
zmq_assert (_addr);
zmq_assert (_addr->protocol == protocol_name::tcp); zmq_assert (_addr->protocol == protocol_name::tcp);
_proxy_addr->to_string (_endpoint); _proxy_addr->to_string (_endpoint);
_socket = _session->get_socket ();
} }
zmq::socks_connecter_t::~socks_connecter_t () zmq::socks_connecter_t::~socks_connecter_t ()
{ {
zmq_assert (_s == retired_fd);
LIBZMQ_DELETE (_proxy_addr); LIBZMQ_DELETE (_proxy_addr);
} }
void zmq::socks_connecter_t::process_plug ()
{
if (_delayed_start)
start_timer ();
else
initiate_connect ();
}
void zmq::socks_connecter_t::process_term (int linger_)
{
switch (_status) {
case unplugged:
break;
case waiting_for_reconnect_time:
cancel_timer (reconnect_timer_id);
break;
case waiting_for_proxy_connection:
case sending_greeting:
case waiting_for_choice:
case sending_request:
case waiting_for_response:
rm_fd (_handle);
if (_s != retired_fd)
close ();
break;
}
own_t::process_term (linger_);
}
void zmq::socks_connecter_t::in_event () void zmq::socks_connecter_t::in_event ()
{ {
zmq_assert (_status != unplugged && _status != waiting_for_reconnect_time); zmq_assert (_status != unplugged);
if (_status == waiting_for_choice) { if (_status == waiting_for_choice) {
int rc = _choice_decoder.input (_s); int rc = _choice_decoder.input (_s);
...@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event () ...@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
std::string hostname = ""; std::string hostname;
uint16_t port = 0; uint16_t port = 0;
if (parse_address (_addr->address, hostname, port) == -1) if (parse_address (_addr->address, hostname, port) == -1)
error (); error ();
...@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event () ...@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
const endpoint_uri_pair_t endpoint_pair = endpoint_uri_pair_t ( rm_handle ();
get_socket_name<tcp_address_t> (_s, socket_end_local), create_engine (
_endpoint, endpoint_type_connect); _s, get_socket_name<tcp_address_t> (_s, socket_end_local));
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (_s, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
_socket->event_connected (endpoint_pair, _s);
rm_fd (_handle);
_s = -1; _s = -1;
_status = unplugged; _status = unplugged;
// Shut the connecter down.
terminate ();
} }
} }
} else } else
...@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event () ...@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event ()
} }
} }
void zmq::socks_connecter_t::initiate_connect () void zmq::socks_connecter_t::start_connecting ()
{ {
zmq_assert (_status == unplugged);
// Open the connecting socket. // Open the connecting socket.
const int rc = connect_to_proxy (); const int rc = connect_to_proxy ();
...@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect () ...@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect ()
else { else {
if (_s != retired_fd) if (_s != retired_fd)
close (); close ();
start_timer (); add_reconnect_timer ();
} }
} }
...@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response ( ...@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response (
return response_.response_code == 0 ? 0 : -1; return response_.response_code == 0 ? 0 : -1;
} }
void zmq::socks_connecter_t::timer_event (int id_)
{
zmq_assert (_status == waiting_for_reconnect_time);
zmq_assert (id_ == reconnect_timer_id);
initiate_connect ();
}
void zmq::socks_connecter_t::error () void zmq::socks_connecter_t::error ()
{ {
rm_fd (_handle); rm_fd (_handle);
...@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error () ...@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error ()
_choice_decoder.reset (); _choice_decoder.reset ();
_request_encoder.reset (); _request_encoder.reset ();
_response_decoder.reset (); _response_decoder.reset ();
start_timer (); add_reconnect_timer ();
}
void zmq::socks_connecter_t::start_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_status = waiting_for_reconnect_time;
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
}
}
int zmq::socks_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
} }
int zmq::socks_connecter_t::connect_to_proxy () int zmq::socks_connecter_t::connect_to_proxy ()
...@@ -439,21 +351,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () ...@@ -439,21 +351,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
return 0; return 0;
} }
void zmq::socks_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
int zmq::socks_connecter_t::parse_address (const std::string &address_, int zmq::socks_connecter_t::parse_address (const std::string &address_,
std::string &hostname_, std::string &hostname_,
uint16_t &port_) uint16_t &port_)
......
...@@ -31,8 +31,7 @@ ...@@ -31,8 +31,7 @@
#define __SOCKS_CONNECTER_HPP_INCLUDED__ #define __SOCKS_CONNECTER_HPP_INCLUDED__
#include "fd.hpp" #include "fd.hpp"
#include "io_object.hpp" #include "stream_connecter_base.hpp"
#include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "socks.hpp" #include "socks.hpp"
...@@ -42,8 +41,7 @@ class io_thread_t; ...@@ -42,8 +41,7 @@ class io_thread_t;
class session_base_t; class session_base_t;
struct address_t; struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t class socks_connecter_t : public stream_connecter_base_t
class socks_connecter_t : public own_t, public io_object_t
{ {
public: public:
// If 'delayed_start' is true connecter first waits for a while, // If 'delayed_start' is true connecter first waits for a while,
...@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t
waiting_for_response waiting_for_response
}; };
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Method ID // Method ID
enum enum
{ {
socks_no_auth_required = 0 socks_no_auth_required = 0
}; };
// Handlers for incoming commands.
virtual void process_plug ();
virtual void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
virtual void in_event (); virtual void in_event ();
virtual void out_event (); virtual void out_event ();
virtual void timer_event (int id_);
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void initiate_connect (); void start_connecting ();
int process_server_response (const socks_choice_t &response_); int process_server_response (const socks_choice_t &response_);
int process_server_response (const socks_response_t &response_); int process_server_response (const socks_response_t &response_);
...@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t
void error (); void error ();
// Internal function to start reconnect timer
void start_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error, // Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with // 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
int open (); int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
zmq::fd_t check_proxy_connection (); zmq::fd_t check_proxy_connection ();
...@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t ...@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t
socks_request_encoder_t _request_encoder; socks_request_encoder_t _request_encoder;
socks_response_decoder_t _response_decoder; socks_response_decoder_t _response_decoder;
// Address to connect to. Owned by session_base_t.
address_t *_addr;
// SOCKS address; owned by this connecter. // SOCKS address; owned by this connecter.
address_t *_proxy_addr; address_t *_proxy_addr;
int _status; int _status;
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket.
handle_t _handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool _handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool _delayed_start;
// True iff a timer has been started.
bool _timer_started;
// Reference to the session we belong to.
zmq::session_base_t *_session;
// Current reconnect ivl, updated for backoff strategy
int _current_reconnect_ivl;
// String representation of endpoint to connect to
std::string _endpoint;
// Socket
zmq::socket_base_t *_socket;
socks_connecter_t (const socks_connecter_t &); socks_connecter_t (const socks_connecter_t &);
const socks_connecter_t &operator= (const socks_connecter_t &); const socks_connecter_t &operator= (const socks_connecter_t &);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment