Commit 5d5263ed authored by Jake Cobb's avatar Jake Cobb Committed by Luca Boccassi

Prevent DOS by asserts in TCP tuning (#2492)

* Prevent DOS by asserts in TCP tuning

-Propagates socket option errors from the
tuning functions to the callers.
-Asserts a subset of error conditions during tuning,
excluding external network causes.
-Checks tuning results in 3 call sites and treats
them like failures to connect, accept, etc.

* Fix variable name

* Remove lambda requiring C++11
parent 1d58a009
...@@ -390,7 +390,7 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () ...@@ -390,7 +390,7 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
socklen_t len = sizeof err; socklen_t len = sizeof err;
#endif #endif
const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
// Assert if the error was caused by 0MQ bug. // Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert. // Networking problems are OK. No need to assert.
...@@ -427,9 +427,11 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () ...@@ -427,9 +427,11 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
} }
#endif #endif
tune_tcp_socket (s); rc = tune_tcp_socket (s);
tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt, rc = rc | tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
if (rc != 0)
return -1;
return 0; return 0;
} }
......
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
#include <ioctl.h> #include <ioctl.h>
#endif #endif
void zmq::tune_tcp_socket (fd_t s_) int zmq::tune_tcp_socket (fd_t s_)
{ {
// Disable Nagle's algorithm. We are doing data batching on 0MQ level, // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
// so using Nagle wouldn't improve throughput in anyway, but it would // so using Nagle wouldn't improve throughput in anyway, but it would
...@@ -53,44 +53,37 @@ void zmq::tune_tcp_socket (fd_t s_) ...@@ -53,44 +53,37 @@ void zmq::tune_tcp_socket (fd_t s_)
int nodelay = 1; int nodelay = 1;
int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
sizeof (int)); sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS tcp_assert_tuning_error(s_, rc);
wsa_assert (rc != SOCKET_ERROR); if (rc != 0)
#else return rc;
errno_assert (rc == 0);
#endif
#ifdef ZMQ_HAVE_OPENVMS #ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements as they hurt latency significantly. // Disable delayed acknowledgements as they hurt latency significantly.
int nodelack = 1; int nodelack = 1;
rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack, rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
sizeof (int)); sizeof (int));
errno_assert (rc != SOCKET_ERROR); tcp_assert_tuning_error(s_, rc);
#endif #endif
return rc;
} }
void zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
{ {
const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
(char*) &bufsize_, sizeof bufsize_); (char*) &bufsize_, sizeof bufsize_);
#ifdef ZMQ_HAVE_WINDOWS tcp_assert_tuning_error(sockfd_, rc);
wsa_assert (rc != SOCKET_ERROR); return rc;
#else
errno_assert (rc == 0);
#endif
} }
void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
{ {
const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
(char *) &bufsize_, sizeof bufsize_); (char *) &bufsize_, sizeof bufsize_);
#ifdef ZMQ_HAVE_WINDOWS tcp_assert_tuning_error(sockfd_, rc);
wsa_assert (rc != SOCKET_ERROR); return rc;
#else
errno_assert (rc == 0);
#endif
} }
void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
int keepalive_idle_, int keepalive_intvl_) int keepalive_idle_, int keepalive_intvl_)
{ {
// These options are used only under certain #ifdefs below. // These options are used only under certain #ifdefs below.
...@@ -115,20 +108,26 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, ...@@ -115,20 +108,26 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
DWORD num_bytes_returned; DWORD num_bytes_returned;
int rc = WSAIoctl(s_, SIO_KEEPALIVE_VALS, &keepalive_opts, int rc = WSAIoctl(s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
sizeof(keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL); sizeof(keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL);
wsa_assert (rc != SOCKET_ERROR); tcp_assert_tuning_error(s_, rc);
if (rc == SOCKET_ERROR)
return rc;
} }
#else #else
#ifdef ZMQ_HAVE_SO_KEEPALIVE #ifdef ZMQ_HAVE_SO_KEEPALIVE
if (keepalive_ != -1) { if (keepalive_ != -1) {
int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
(char*) &keepalive_, sizeof (int)); (char*) &keepalive_, sizeof (int));
errno_assert (rc == 0); tcp_assert_tuning_error(s_, rc);
if (rc != 0)
return rc;
#ifdef ZMQ_HAVE_TCP_KEEPCNT #ifdef ZMQ_HAVE_TCP_KEEPCNT
if (keepalive_cnt_ != -1) { if (keepalive_cnt_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT,
&keepalive_cnt_, sizeof (int)); &keepalive_cnt_, sizeof (int));
errno_assert (rc == 0); tcp_assert_tuning_error(s_, rc);
if (rc != 0)
return rc;
} }
#endif // ZMQ_HAVE_TCP_KEEPCNT #endif // ZMQ_HAVE_TCP_KEEPCNT
...@@ -136,14 +135,18 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, ...@@ -136,14 +135,18 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
if (keepalive_idle_ != -1) { if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
&keepalive_idle_, sizeof (int)); &keepalive_idle_, sizeof (int));
errno_assert (rc == 0); tcp_assert_tuning_error(s_, rc);
if (rc != 0)
return rc;
} }
#else // ZMQ_HAVE_TCP_KEEPIDLE #else // ZMQ_HAVE_TCP_KEEPIDLE
#ifdef ZMQ_HAVE_TCP_KEEPALIVE #ifdef ZMQ_HAVE_TCP_KEEPALIVE
if (keepalive_idle_ != -1) { if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
&keepalive_idle_, sizeof (int)); &keepalive_idle_, sizeof (int));
errno_assert (rc == 0); tcp_assert_tuning_error(s_, rc);
if (rc != 0)
return rc;
} }
#endif // ZMQ_HAVE_TCP_KEEPALIVE #endif // ZMQ_HAVE_TCP_KEEPALIVE
#endif // ZMQ_HAVE_TCP_KEEPIDLE #endif // ZMQ_HAVE_TCP_KEEPIDLE
...@@ -152,18 +155,22 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, ...@@ -152,18 +155,22 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
if (keepalive_intvl_ != -1) { if (keepalive_intvl_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
&keepalive_intvl_, sizeof (int)); &keepalive_intvl_, sizeof (int));
errno_assert (rc == 0); tcp_assert_tuning_error(s_, rc);
if (rc != 0)
return rc;
} }
#endif // ZMQ_HAVE_TCP_KEEPINTVL #endif // ZMQ_HAVE_TCP_KEEPINTVL
} }
#endif // ZMQ_HAVE_SO_KEEPALIVE #endif // ZMQ_HAVE_SO_KEEPALIVE
#endif // ZMQ_HAVE_WINDOWS #endif // ZMQ_HAVE_WINDOWS
return 0;
} }
void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
{ {
if (timeout_ <= 0) if (timeout_ <= 0)
return; return 0;
LIBZMQ_UNUSED (sockfd_); LIBZMQ_UNUSED (sockfd_);
...@@ -172,13 +179,16 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) ...@@ -172,13 +179,16 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
timeout_ /= 1000; // in seconds timeout_ /= 1000; // in seconds
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_, int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_,
sizeof(timeout_)); sizeof(timeout_));
wsa_assert (rc != SOCKET_ERROR); tcp_assert_tuning_error(sockfd_, rc);
return rc;
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
#elif defined (TCP_USER_TIMEOUT) #elif defined (TCP_USER_TIMEOUT)
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
sizeof(timeout_)); sizeof(timeout_));
errno_assert (rc == 0); tcp_assert_tuning_error(sockfd_, rc);
return rc;
#endif #endif
return 0;
} }
int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
...@@ -289,3 +299,54 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_) ...@@ -289,3 +299,54 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
#endif #endif
} }
void zmq::tcp_assert_tuning_error(zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
return;
// Check whether an error occurred
int err = 0;
#ifdef ZMQ_HAVE_HPUX
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif
int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED
|| err == WSAETIMEDOUT
|| err == WSAECONNABORTED
|| err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH
|| err == WSAENETDOWN
|| err == WSAEACCES
|| err == WSAEINVAL
|| err == WSAEADDRINUSE);
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
errno_assert (
errno == ECONNREFUSED ||
errno == ECONNRESET ||
errno == ECONNABORTED ||
errno == EINTR ||
errno == ETIMEDOUT ||
errno == EHOSTUNREACH ||
errno == ENETUNREACH ||
errno == ENETDOWN ||
errno == EINVAL);
}
#endif
}
...@@ -36,20 +36,20 @@ namespace zmq ...@@ -36,20 +36,20 @@ namespace zmq
{ {
// Tunes the supplied TCP socket for the best latency. // Tunes the supplied TCP socket for the best latency.
void tune_tcp_socket (fd_t s_); int tune_tcp_socket (fd_t s_);
// Sets the socket send buffer size. // Sets the socket send buffer size.
void set_tcp_send_buffer (fd_t sockfd_, int bufsize_); int set_tcp_send_buffer (fd_t sockfd_, int bufsize_);
// Sets the socket receive buffer size. // Sets the socket receive buffer size.
void set_tcp_receive_buffer (fd_t sockfd_, int bufsize_); int set_tcp_receive_buffer (fd_t sockfd_, int bufsize_);
// Tunes TCP keep-alives // Tunes TCP keep-alives
void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
int keepalive_idle_, int keepalive_intvl_); int keepalive_idle_, int keepalive_intvl_);
// Tunes TCP max retransmit timeout // Tunes TCP max retransmit timeout
void tune_tcp_maxrt (fd_t sockfd_, int timeout_); int tune_tcp_maxrt (fd_t sockfd_, int timeout_);
// Writes data to the socket. Returns the number of bytes actually // Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case // written (even zero is to be considered to be a success). In case
...@@ -61,6 +61,9 @@ namespace zmq ...@@ -61,6 +61,9 @@ namespace zmq
// Zero indicates the peer has closed the connection. // Zero indicates the peer has closed the connection.
int tcp_read (fd_t s_, void *data_, size_t size_); int tcp_read (fd_t s_, void *data_, size_t size_);
// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
void tcp_assert_tuning_error(fd_t s_, int rc_);
} }
#endif #endif
...@@ -136,6 +136,7 @@ void zmq::tcp_connecter_t::out_event () ...@@ -136,6 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
handle_valid = false; handle_valid = false;
const fd_t fd = connect (); const fd_t fd = connect ();
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
close (); close ();
...@@ -143,10 +144,15 @@ void zmq::tcp_connecter_t::out_event () ...@@ -143,10 +144,15 @@ void zmq::tcp_connecter_t::out_event ()
return; return;
} }
tune_tcp_socket (fd); int rc = tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, rc = rc | tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
close ();
add_reconnect_timer ();
return;
}
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
......
...@@ -97,10 +97,14 @@ void zmq::tcp_listener_t::in_event () ...@@ -97,10 +97,14 @@ void zmq::tcp_listener_t::in_event ()
return; return;
} }
tune_tcp_socket (fd); int rc = tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, rc = rc | tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
socket->event_accept_failed (endpoint, zmq_errno());
return;
}
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment