Unverified Commit debbe08f authored by Bill Torpey's avatar Bill Torpey Committed by GitHub

add option to stop trying to reconnect on ECONNREFUSED (#3831)

* add option to stop trying to reconnect on ECONNREFUSED
parent 66ee3ee4
......@@ -1039,7 +1039,8 @@ test_apps += tests/test_poller \
tests/test_app_meta \
tests/test_xpub_manual_last_value \
tests/test_router_notify \
tests/test_peer
tests/test_peer \
tests/test_reconnect_options
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
......@@ -1084,6 +1085,10 @@ tests_test_router_notify_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_peer_SOURCES = tests/test_peer.cpp
tests_test_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_reconnect_options_SOURCES = tests/test_reconnect_options.cpp
tests_test_reconnect_options_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_reconnect_options_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif
if ENABLE_STATIC
......
......@@ -626,6 +626,22 @@ Default value:: 0 (only use ZMQ_RECONNECT_IVL)
Applicable socket types:: all, only for connection-oriented transport
ZMQ_RECONNECT_STOP: Retrieve condition where reconnection will stop
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_STOP' option shall retrieve the conditions under which
automatic reconnection will stop.
The 'ZMQ_RECONNECT_STOP_CONN_REFUSED' option will stop reconnection when 0MQ
receives the ECONNREFUSED return code from the connect. This indicates that
there is no code bound to the specified endpoint.
[horizontal]
Option value type:: int
Option value unit:: 'ZMQ_RECONNECT_STOP_CONN_REFUSED'
Default value:: 0
Applicable socket types:: all, only for connection-oriented transports
ZMQ_RECOVERY_IVL: Get multicast recovery interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECOVERY_IVL' option shall retrieve the recovery interval for
......
......@@ -701,6 +701,23 @@ Default value:: 0 (only use ZMQ_RECONNECT_IVL)
Applicable socket types:: all, only for connection-oriented transports
ZMQ_RECONNECT_STOP: Set condition where reconnection will stop
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_STOP' option shall set the conditions under which automatic
reconnection will stop. This can be useful when a process binds to a
wild-card port, where the OS supplies an ephemeral port.
The 'ZMQ_RECONNECT_STOP_CONN_REFUSED' option will stop reconnection when 0MQ
receives the ECONNREFUSED return code from the connect. This indicates that
there is no code bound to the specified endpoint.
[horizontal]
Option value type:: int
Option value unit:: ZMQ_RECONNECT_STOP_CONN_REFUSED
Default value:: 0
Applicable socket types:: all, only for connection-oriented transports
ZMQ_RECOVERY_IVL: Set multicast recovery interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECOVERY_IVL' option shall set the recovery interval for multicast
......
......@@ -677,7 +677,10 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_WSS_HOSTNAME 106
#define ZMQ_WSS_TRUST_SYSTEM 107
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
......
......@@ -74,6 +74,7 @@ __declspec(align (64))
reap,
reaped,
inproc_connected,
conn_failed,
pipe_peer_stats,
pipe_stats_publish,
done
......
......@@ -161,6 +161,10 @@ void zmq::object_t::process_command (const command_t &cmd_)
process_seqnum ();
break;
case command_t::conn_failed:
process_conn_failed ();
break;
case command_t::done:
default:
zmq_assert (false);
......@@ -257,6 +261,14 @@ void zmq::object_t::send_attach (session_base_t *destination_,
send_command (cmd);
}
void zmq::object_t::send_conn_failed (session_base_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::conn_failed;
send_command (cmd);
}
void zmq::object_t::send_bind (own_t *destination_,
pipe_t *pipe_,
bool inc_seqnum_)
......@@ -528,6 +540,11 @@ void zmq::object_t::process_seqnum ()
zmq_assert (false);
}
void zmq::object_t::process_conn_failed ()
{
zmq_assert (false);
}
void zmq::object_t::send_command (const command_t &cmd_)
{
_ctx->send_command (cmd_.destination->get_tid (), cmd_);
......
......@@ -117,6 +117,8 @@ class object_t
void send_reap (zmq::socket_base_t *socket_);
void send_reaped ();
void send_done ();
void send_conn_failed (zmq::session_base_t *destination_);
// These handlers can be overridden by the derived objects. They are
// called when command arrives from another thread.
......@@ -144,6 +146,8 @@ class object_t
virtual void process_term_endpoint (std::string *endpoint_);
virtual void process_reap (zmq::socket_base_t *socket_);
virtual void process_reaped ();
virtual void process_conn_failed ();
// Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter
......
......@@ -210,6 +210,7 @@ zmq::options_t::options_t () :
linger (-1),
connect_timeout (0),
tcp_maxrt (0),
reconnect_stop (0),
reconnect_ivl (100),
reconnect_ivl_max (0),
backlog (100),
......@@ -393,6 +394,13 @@ int zmq::options_t::setsockopt (int option_,
}
break;
case ZMQ_RECONNECT_STOP:
if (is_int) {
reconnect_stop = value;
return 0;
}
break;
case ZMQ_RECONNECT_IVL:
if (is_int && value >= -1) {
reconnect_ivl = value;
......@@ -933,6 +941,13 @@ int zmq::options_t::getsockopt (int option_,
}
break;
case ZMQ_RECONNECT_STOP:
if (is_int) {
*value = reconnect_stop;
return 0;
}
break;
case ZMQ_RECONNECT_IVL:
if (is_int) {
*value = reconnect_ivl;
......
......@@ -116,6 +116,10 @@ struct options_t
// Default 0 (unused)
int tcp_maxrt;
// Disable reconnect under certain conditions
// Default 0
int reconnect_stop;
// Minimum interval between attempts to reconnect, in milliseconds.
// Default 100ms
int reconnect_ivl;
......
......@@ -523,6 +523,13 @@ void zmq::session_base_t::timer_event (int id_)
_pipe->terminate (false);
}
void zmq::session_base_t::process_conn_failed ()
{
std::string *ep = new (std::string);
_addr->to_string (*ep);
send_term_endpoint (_socket, ep);
}
void zmq::session_base_t::reconnect ()
{
// For delayed connect situations, terminate the pipe
......
......@@ -141,6 +141,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
void process_plug () ZMQ_FINAL;
void process_attach (zmq::i_engine *engine_) ZMQ_FINAL;
void process_term (int linger_) ZMQ_FINAL;
void process_conn_failed ();
// i_poll_events handlers.
void timer_event (int id_) ZMQ_FINAL;
......
......@@ -55,10 +55,10 @@ zmq::stream_connecter_base_t::stream_connecter_base_t (
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_socket (session_->get_socket ()),
_session (session_),
_delayed_start (delayed_start_),
_reconnect_timer_started (false),
_current_reconnect_ivl (options.reconnect_ivl)
_current_reconnect_ivl (options.reconnect_ivl),
_session (session_)
{
zmq_assert (_addr);
_addr->to_string (_endpoint);
......
......@@ -91,9 +91,6 @@ class stream_connecter_base_t : public own_t, public io_object_t
// Socket
zmq::socket_base_t *const _socket;
// Reference to the session we belong to.
zmq::session_base_t *const _session;
private:
// ID of the timer used to delay the reconnection.
enum
......@@ -118,6 +115,10 @@ class stream_connecter_base_t : public own_t, public io_object_t
int _current_reconnect_ivl;
ZMQ_NON_COPYABLE_NOR_MOVABLE (stream_connecter_base_t)
protected:
// Reference to the session we belong to.
zmq::session_base_t *const _session;
};
}
......
......@@ -103,6 +103,15 @@ void zmq::tcp_connecter_t::out_event ()
const fd_t fd = connect ();
if (fd == retired_fd
&& ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
&& errno == ECONNREFUSED)) {
send_conn_failed (_session);
close ();
terminate ();
return;
}
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd || !tune_socket (fd)) {
close ();
......@@ -266,6 +275,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
|| err == WSAENOBUFS) {
wsa_assert_no (err);
}
errno = wsa_error_to_errno (err);
return retired_fd;
}
#else
......
......@@ -64,7 +64,10 @@
#define ZMQ_WSS_HOSTNAME 106
#define ZMQ_WSS_TRUST_SYSTEM 107
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
......
......@@ -163,6 +163,7 @@ if(ENABLE_DRAFTS)
test_router_notify
test_xpub_manual_last_value
test_peer
test_reconnect_options
)
endif()
......@@ -292,7 +293,6 @@ if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL)
endif()
set_tests_properties(test_security_zap PROPERTIES TIMEOUT 60)
set_tests_properties(test_reconnect_ivl PROPERTIES TIMEOUT 15)
#Check whether all tests in the current folder are present
......
/*
Copyright (c) 2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include "testutil_monitoring.hpp"
#include <unity.h>
// test behavior with (mostly) default values
void reconnect_default ()
{
// setup pub socket
void *pub = test_context_socket (ZMQ_PUB);
// Bind pub socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, ENDPOINT_0));
// setup sub socket
void *sub = test_context_socket (ZMQ_SUB);
// Monitor all events on sub
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL));
// Create socket for collecting monitor events
void *sub_mon = test_context_socket (ZMQ_PAIR);
// Connect so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub"));
// set reconnect interval so only a single reconnect is tried
int interval = 60 * 1000;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_RECONNECT_IVL, &interval, sizeof (interval)));
// connect to pub
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, ENDPOINT_0));
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
// close the pub socket
test_context_socket_close_zero_linger (pub);
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_DISCONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_RETRIED);
// ZMQ_EVENT_CONNECT_RETRIED should be last event, because of timeout set above
int event;
char *event_address;
int rc = get_monitor_event_with_timeout (sub_mon, &event, &event_address,
2 * 1000);
assert (rc == -1);
// Close sub
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub);
// Close monitor
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub_mon);
}
// test successful reconnect
void reconnect_success ()
{
// setup pub socket
void *pub = test_context_socket (ZMQ_PUB);
// Bind pub socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, ENDPOINT_0));
// setup sub socket
void *sub = test_context_socket (ZMQ_SUB);
// Monitor all events on sub
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL));
// Create socket for collecting monitor events
void *sub_mon = test_context_socket (ZMQ_PAIR);
// Connect so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub"));
// set reconnect interval so only a single reconnect is tried
int interval = 1 * 1000;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_RECONNECT_IVL, &interval, sizeof (interval)));
// connect to pub
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, ENDPOINT_0));
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
// close the pub socket
test_context_socket_close_zero_linger (pub);
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_DISCONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_RETRIED);
// ZMQ_EVENT_CONNECT_RETRIED should be last event, because of timeout set above
int event;
char *event_address;
int rc = get_monitor_event_with_timeout (sub_mon, &event, &event_address,
SETTLE_TIME);
assert (rc == -1);
// Now re-bind pub socket and wait for re-connect
pub = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, ENDPOINT_0));
msleep (SETTLE_TIME);
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
// ZMQ_EVENT_HANDSHAKE_SUCCEEDED should be last event
rc = get_monitor_event_with_timeout (sub_mon, &event, &event_address,
SETTLE_TIME);
assert (rc == -1);
// Close sub
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub);
test_context_socket_close_zero_linger (pub);
// Close monitor
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub_mon);
}
// test stopping reconnect on connection refused
void reconnect_stop_on_refused ()
{
// setup pub socket
void *pub = test_context_socket (ZMQ_PUB);
// Bind pub socket
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, ENDPOINT_0));
// setup sub socket
void *sub = test_context_socket (ZMQ_SUB);
// Monitor all events on sub
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL));
// Create socket for collecting monitor events
void *sub_mon = test_context_socket (ZMQ_PAIR);
// Connect so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub"));
// set option to stop reconnecting on error
int stopReconnectOnError = ZMQ_RECONNECT_STOP_CONN_REFUSED;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_RECONNECT_STOP,
&stopReconnectOnError,
sizeof (stopReconnectOnError)));
// connect to pub
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, ENDPOINT_0));
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
// close the pub socket
test_context_socket_close_zero_linger (pub);
// confirm that we get following events
expect_monitor_event (sub_mon, ZMQ_EVENT_DISCONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_RETRIED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CLOSED);
// ZMQ_EVENT_CLOSED should be last event, because of ZMQ_RECONNECT_STOP set above
int event = 0;
char *event_address;
int rc = get_monitor_event_with_timeout (sub_mon, &event, &event_address,
2 * 1000);
int limit = 0;
while ((rc != -1) && (++limit < 1000)) {
print_unexpected_event_stderr(event, rc, 0, -1);
rc = get_monitor_event_with_timeout (sub_mon, &event, &event_address,
2 * 1000);
}
// Close sub
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub);
// Close monitor
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub_mon);
}
void setUp ()
{
setup_test_context ();
}
void tearDown ()
{
teardown_test_context ();
}
int main (void)
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (reconnect_default);
RUN_TEST (reconnect_success);
RUN_TEST (reconnect_stop_on_refused);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment