Commit 1a230e89 authored by Simon Giesecke's avatar Simon Giesecke

Problem: process_plug, get_new_reconnect_ivl and add_reconnect_timer duplicated…

Problem: process_plug, get_new_reconnect_ivl and add_reconnect_timer duplicated across subclasses of stream_connector_base_t

Solution: pull up to stream_connector_base_t
parent 74667ebc
...@@ -62,14 +62,6 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, ...@@ -62,14 +62,6 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert (_addr->protocol == protocol_name::ipc); zmq_assert (_addr->protocol == protocol_name::ipc);
} }
void zmq::ipc_connecter_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::ipc_connecter_t::process_term (int linger_) void zmq::ipc_connecter_t::process_term (int linger_)
{ {
if (_reconnect_timer_started) { if (_reconnect_timer_started) {
...@@ -145,6 +137,10 @@ void zmq::ipc_connecter_t::start_connecting () ...@@ -145,6 +137,10 @@ void zmq::ipc_connecter_t::start_connecting ()
_handle = add_fd (_s); _handle = add_fd (_s);
set_pollout (_handle); set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ()); _socket->event_connect_delayed (_endpoint, zmq_errno ());
// TODO, tcp_connecter_t adds a connect timer in this case; maybe this
// should be done here as well (and then this could be pulled up to
// stream_connecter_base_t).
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -155,35 +151,6 @@ void zmq::ipc_connecter_t::start_connecting () ...@@ -155,35 +151,6 @@ void zmq::ipc_connecter_t::start_connecting ()
} }
} }
void zmq::ipc_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
int rc_ivl = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, rc_ivl);
_reconnect_timer_started = true;
}
}
int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
_current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
_current_reconnect_ivl = _current_reconnect_ivl * 2;
if (_current_reconnect_ivl >= options.reconnect_ivl_max) {
_current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::ipc_connecter_t::open () int zmq::ipc_connecter_t::open ()
{ {
zmq_assert (_s == retired_fd); zmq_assert (_s == retired_fd);
......
...@@ -57,7 +57,6 @@ class ipc_connecter_t : public stream_connecter_base_t ...@@ -57,7 +57,6 @@ class ipc_connecter_t : public stream_connecter_base_t
}; };
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug ();
void process_term (int linger_); void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
...@@ -68,14 +67,6 @@ class ipc_connecter_t : public stream_connecter_base_t ...@@ -68,14 +67,6 @@ class ipc_connecter_t : public stream_connecter_base_t
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open IPC connecting socket. Returns -1 in case of error, // Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with // 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "stream_connecter_base.hpp" #include "stream_connecter_base.hpp"
#include "session_base.hpp" #include "session_base.hpp"
#include "address.hpp" #include "address.hpp"
#include "random.hpp"
zmq::stream_connecter_base_t::stream_connecter_base_t ( zmq::stream_connecter_base_t::stream_connecter_base_t (
zmq::io_thread_t *io_thread_, zmq::io_thread_t *io_thread_,
...@@ -62,3 +63,37 @@ zmq::stream_connecter_base_t::~stream_connecter_base_t () ...@@ -62,3 +63,37 @@ zmq::stream_connecter_base_t::~stream_connecter_base_t ()
zmq_assert (!_handle); zmq_assert (!_handle);
zmq_assert (_s == retired_fd); zmq_assert (_s == retired_fd);
} }
void zmq::stream_connecter_base_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::stream_connecter_base_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_reconnect_timer_started = true;
}
}
int zmq::stream_connecter_base_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
}
...@@ -53,8 +53,28 @@ class stream_connecter_base_t : public own_t, public io_object_t ...@@ -53,8 +53,28 @@ class stream_connecter_base_t : public own_t, public io_object_t
~stream_connecter_base_t (); ~stream_connecter_base_t ();
private:
// Handlers for incoming commands.
void process_plug ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
virtual void start_connecting () = 0;
// TODO check if some members can be made private
protected: protected:
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Address to connect to. Owned by session_base_t. // Address to connect to. Owned by session_base_t.
// It is non-const since some parts may change during opening. // It is non-const since some parts may change during opening.
address_t *const _addr; address_t *const _addr;
......
...@@ -35,7 +35,6 @@ ...@@ -35,7 +35,6 @@
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "random.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "tcp.hpp" #include "tcp.hpp"
...@@ -81,14 +80,6 @@ zmq::tcp_connecter_t::~tcp_connecter_t () ...@@ -81,14 +80,6 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
zmq_assert (!_connect_timer_started); zmq_assert (!_connect_timer_started);
} }
void zmq::tcp_connecter_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::tcp_connecter_t::process_term (int linger_) void zmq::tcp_connecter_t::process_term (int linger_)
{ {
if (_connect_timer_started) { if (_connect_timer_started) {
...@@ -208,32 +199,6 @@ void zmq::tcp_connecter_t::add_connect_timer () ...@@ -208,32 +199,6 @@ void zmq::tcp_connecter_t::add_connect_timer ()
} }
} }
void zmq::tcp_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_reconnect_timer_started = true;
}
}
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
}
int zmq::tcp_connecter_t::open () int zmq::tcp_connecter_t::open ()
{ {
zmq_assert (_s == retired_fd); zmq_assert (_s == retired_fd);
......
...@@ -49,15 +49,13 @@ class tcp_connecter_t : public stream_connecter_base_t ...@@ -49,15 +49,13 @@ class tcp_connecter_t : public stream_connecter_base_t
~tcp_connecter_t (); ~tcp_connecter_t ();
private: private:
// ID of the timer used to delay the reconnection. // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
enum enum
{ {
reconnect_timer_id = 1, connect_timer_id = 2
connect_timer_id
}; };
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug ();
void process_term (int linger_); void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
...@@ -74,14 +72,6 @@ class tcp_connecter_t : public stream_connecter_base_t ...@@ -74,14 +72,6 @@ class tcp_connecter_t : public stream_connecter_base_t
// Internal function to add a connect timer // Internal function to add a connect timer
void add_connect_timer (); void add_connect_timer ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error, // Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with // 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
......
...@@ -64,14 +64,6 @@ zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, ...@@ -64,14 +64,6 @@ zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert (_addr->protocol == "tipc"); zmq_assert (_addr->protocol == "tipc");
} }
void zmq::tipc_connecter_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::tipc_connecter_t::process_term (int linger_) void zmq::tipc_connecter_t::process_term (int linger_)
{ {
if (_reconnect_timer_started) { if (_reconnect_timer_started) {
...@@ -157,35 +149,6 @@ void zmq::tipc_connecter_t::start_connecting () ...@@ -157,35 +149,6 @@ void zmq::tipc_connecter_t::start_connecting ()
} }
} }
void zmq::tipc_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
int rc_ivl = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, rc_ivl);
_reconnect_timer_started = true;
}
}
int zmq::tipc_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
_current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
_current_reconnect_ivl = _current_reconnect_ivl * 2;
if (_current_reconnect_ivl >= options.reconnect_ivl_max) {
_current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::tipc_connecter_t::open () int zmq::tipc_connecter_t::open ()
{ {
zmq_assert (_s == retired_fd); zmq_assert (_s == retired_fd);
......
...@@ -58,7 +58,6 @@ class tipc_connecter_t : public stream_connecter_base_t ...@@ -58,7 +58,6 @@ class tipc_connecter_t : public stream_connecter_base_t
}; };
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug ();
void process_term (int linger_); void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
...@@ -69,9 +68,6 @@ class tipc_connecter_t : public stream_connecter_base_t ...@@ -69,9 +68,6 @@ class tipc_connecter_t : public stream_connecter_base_t
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Close the connecting socket. // Close the connecting socket.
void close (); void close ();
...@@ -79,11 +75,6 @@ class tipc_connecter_t : public stream_connecter_base_t ...@@ -79,11 +75,6 @@ class tipc_connecter_t : public stream_connecter_base_t
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open IPC connecting socket. Returns -1 in case of error, // Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with // 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment