Commit 064c2e08 authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1508 from pijyoi/connect_timeout

add ZMQ_CONNECT_TIMEOUT option
parents 5724b55f c9971e08
...@@ -63,6 +63,20 @@ Default value:: 100 ...@@ -63,6 +63,20 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports Applicable socket types:: all, only for connection-oriented transports
ZMQ_CONNECT_TIMEOUT: Retrieve connect() timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieves how long to wait before timing-out a connect() system call.
The connect() system call normally takes a long time before it returns a
time out error. Setting this option allows the library to time out the call
at an earlier interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (disabled)
Applicable socket types:: all, when using TCP transports.
ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
...@@ -109,6 +109,20 @@ Default value:: 0 (false) ...@@ -109,6 +109,20 @@ Default value:: 0 (false)
Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER
ZMQ_CONNECT_TIMEOUT: Set connect() timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets how long to wait before timing-out a connect() system call.
The connect() system call normally takes a long time before it returns a
time out error. Setting this option allows the library to time out the call
at an earlier interval.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (disabled)
Applicable socket types:: all, when using TCP transports.
ZMQ_CURVE_PUBLICKEY: Set CURVE public key ZMQ_CURVE_PUBLICKEY: Set CURVE public key
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the socket's long term public key. You must set this on CURVE client Sets the socket's long term public key. You must set this on CURVE client
......
...@@ -320,6 +320,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); ...@@ -320,6 +320,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
#define ZMQ_HEARTBEAT_TTL 76 #define ZMQ_HEARTBEAT_TTL 76
#define ZMQ_HEARTBEAT_TIMEOUT 77 #define ZMQ_HEARTBEAT_TIMEOUT 77
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78 #define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78
#define ZMQ_CONNECT_TIMEOUT 79
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -46,6 +46,7 @@ zmq::options_t::options_t () : ...@@ -46,6 +46,7 @@ zmq::options_t::options_t () :
tos (0), tos (0),
type (-1), type (-1),
linger (-1), linger (-1),
connect_timeout (0),
reconnect_ivl (100), reconnect_ivl (100),
reconnect_ivl_max (0), reconnect_ivl_max (0),
backlog (100), backlog (100),
...@@ -158,6 +159,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -158,6 +159,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
} }
break; break;
case ZMQ_CONNECT_TIMEOUT:
if (is_int && value >= 0) {
connect_timeout = value;
return 0;
}
break;
case ZMQ_RECONNECT_IVL: case ZMQ_RECONNECT_IVL:
if (is_int && value >= -1) { if (is_int && value >= -1) {
reconnect_ivl = value; reconnect_ivl = value;
...@@ -653,6 +661,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -653,6 +661,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
} }
break; break;
case ZMQ_CONNECT_TIMEOUT:
if (is_int) {
*value = connect_timeout;
return 0;
}
break;
case ZMQ_RECONNECT_IVL: case ZMQ_RECONNECT_IVL:
if (is_int) { if (is_int) {
*value = reconnect_ivl; *value = reconnect_ivl;
......
...@@ -92,6 +92,11 @@ namespace zmq ...@@ -92,6 +92,11 @@ namespace zmq
// Linger time, in milliseconds. // Linger time, in milliseconds.
int linger; int linger;
// Maximum interval in milliseconds beyond which userspace will
// timeout connect().
// Default 0 (unused)
int connect_timeout;
// Minimum interval between attempts to reconnect, in milliseconds. // Minimum interval between attempts to reconnect, in milliseconds.
// Default 100ms // Default 100ms
int reconnect_ivl; int reconnect_ivl;
......
...@@ -67,7 +67,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, ...@@ -67,7 +67,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
delayed_start (delayed_start_), delayed_start (delayed_start_),
timer_started (false), connect_timer_started (false),
reconnect_timer_started (false),
session (session_), session (session_),
current_reconnect_ivl (options.reconnect_ivl) current_reconnect_ivl (options.reconnect_ivl)
{ {
...@@ -79,7 +80,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, ...@@ -79,7 +80,8 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
{ {
zmq_assert (!timer_started); zmq_assert (!connect_timer_started);
zmq_assert (!reconnect_timer_started);
zmq_assert (!handle_valid); zmq_assert (!handle_valid);
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
} }
...@@ -94,9 +96,14 @@ void zmq::tcp_connecter_t::process_plug () ...@@ -94,9 +96,14 @@ void zmq::tcp_connecter_t::process_plug ()
void zmq::tcp_connecter_t::process_term (int linger_) void zmq::tcp_connecter_t::process_term (int linger_)
{ {
if (timer_started) { if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
if (reconnect_timer_started) {
cancel_timer (reconnect_timer_id); cancel_timer (reconnect_timer_id);
timer_started = false; reconnect_timer_started = false;
} }
if (handle_valid) { if (handle_valid) {
...@@ -120,6 +127,11 @@ void zmq::tcp_connecter_t::in_event () ...@@ -120,6 +127,11 @@ void zmq::tcp_connecter_t::in_event ()
void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::out_event ()
{ {
if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
rm_fd (handle); rm_fd (handle);
handle_valid = false; handle_valid = false;
...@@ -153,9 +165,20 @@ void zmq::tcp_connecter_t::out_event () ...@@ -153,9 +165,20 @@ void zmq::tcp_connecter_t::out_event ()
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id); zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
timer_started = false; if (id_ == connect_timer_id) {
connect_timer_started = false;
rm_fd (handle);
handle_valid = false;
close ();
add_reconnect_timer ();
}
else if (id_ == reconnect_timer_id) {
reconnect_timer_started = false;
start_connecting (); start_connecting ();
}
} }
void zmq::tcp_connecter_t::start_connecting () void zmq::tcp_connecter_t::start_connecting ()
...@@ -177,6 +200,9 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -177,6 +200,9 @@ void zmq::tcp_connecter_t::start_connecting ()
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno()); socket->event_connect_delayed (endpoint, zmq_errno());
// add userspace connect timeout
add_connect_timer ();
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -187,12 +213,20 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -187,12 +213,20 @@ void zmq::tcp_connecter_t::start_connecting ()
} }
} }
void zmq::tcp_connecter_t::add_connect_timer ()
{
if (options.connect_timeout > 0) {
add_timer (options.connect_timeout, connect_timer_id);
connect_timer_started = true;
}
}
void zmq::tcp_connecter_t::add_reconnect_timer () void zmq::tcp_connecter_t::add_reconnect_timer ()
{ {
const int interval = get_new_reconnect_ivl (); const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id); add_timer (interval, reconnect_timer_id);
socket->event_connect_retried (endpoint, interval); socket->event_connect_retried (endpoint, interval);
timer_started = true; reconnect_timer_started = true;
} }
int zmq::tcp_connecter_t::get_new_reconnect_ivl () int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
......
...@@ -57,7 +57,7 @@ namespace zmq ...@@ -57,7 +57,7 @@ namespace zmq
private: private:
// ID of the timer used to delay the reconnection. // ID of the timer used to delay the reconnection.
enum {reconnect_timer_id = 1}; enum {reconnect_timer_id = 1, connect_timer_id};
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
...@@ -71,6 +71,9 @@ namespace zmq ...@@ -71,6 +71,9 @@ namespace zmq
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Internal function to add a connect timer
void add_connect_timer();
// Internal function to add a reconnect timer // Internal function to add a reconnect timer
void add_reconnect_timer(); void add_reconnect_timer();
...@@ -108,7 +111,8 @@ namespace zmq ...@@ -108,7 +111,8 @@ namespace zmq
const bool delayed_start; const bool delayed_start;
// True iff a timer has been started. // True iff a timer has been started.
bool timer_started; bool connect_timer_started;
bool reconnect_timer_started;
// Reference to the session we belong to. // Reference to the session we belong to.
zmq::session_base_t *session; zmq::session_base_t *session;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment