Unverified Commit 59095f9d authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3765 from sigiesec/various-improvements

Various improvements
parents 3e7995f7 14ab7946
...@@ -63,7 +63,7 @@ ...@@ -63,7 +63,7 @@
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
int clipped_maxsocket (int max_requested_) static int clipped_maxsocket (int max_requested_)
{ {
if (max_requested_ >= zmq::poller_t::max_fds () if (max_requested_ >= zmq::poller_t::max_fds ()
&& zmq::poller_t::max_fds () != -1) && zmq::poller_t::max_fds () != -1)
......
...@@ -171,9 +171,13 @@ void zmq::dist_t::distribute (msg_t *msg_) ...@@ -171,9 +171,13 @@ void zmq::dist_t::distribute (msg_t *msg_)
} }
if (msg_->is_vsm ()) { if (msg_->is_vsm ()) {
for (pipes_t::size_type i = 0; i < _matching; ++i) for (pipes_t::size_type i = 0; i < _matching;) {
if (!write (_pipes[i], msg_)) if (!write (_pipes[i], msg_)) {
--i; // Retry last write because index will have been swapped // Use same index again because entry will have been removed.
} else {
++i;
}
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init (); rc = msg_->init ();
...@@ -187,11 +191,14 @@ void zmq::dist_t::distribute (msg_t *msg_) ...@@ -187,11 +191,14 @@ void zmq::dist_t::distribute (msg_t *msg_)
// Push copy of the message to each matching pipe. // Push copy of the message to each matching pipe.
int failed = 0; int failed = 0;
for (pipes_t::size_type i = 0; i < _matching; ++i) for (pipes_t::size_type i = 0; i < _matching;) {
if (!write (_pipes[i], msg_)) { if (!write (_pipes[i], msg_)) {
++failed; ++failed;
--i; // Retry last write because index will have been swapped // Use same index again because entry will have been removed.
} else {
++i;
} }
}
if (unlikely (failed)) if (unlikely (failed))
msg_->rm_refs (failed); msg_->rm_refs (failed);
......
...@@ -102,9 +102,9 @@ static int do_getsockopt_curve_key (void *const optval_, ...@@ -102,9 +102,9 @@ static int do_getsockopt_curve_key (void *const optval_,
#endif #endif
template <typename T> template <typename T>
int do_setsockopt (const void *const optval_, static int do_setsockopt (const void *const optval_,
const size_t optvallen_, const size_t optvallen_,
T *const out_value_) T *const out_value_)
{ {
if (optvallen_ == sizeof (T)) { if (optvallen_ == sizeof (T)) {
memcpy (out_value_, optval_, sizeof (T)); memcpy (out_value_, optval_, sizeof (T));
...@@ -176,9 +176,9 @@ do_setsockopt_string_allow_empty_relaxed (const void *const optval_, ...@@ -176,9 +176,9 @@ do_setsockopt_string_allow_empty_relaxed (const void *const optval_,
} }
template <typename T> template <typename T>
int do_setsockopt_set (const void *const optval_, static int do_setsockopt_set (const void *const optval_,
const size_t optvallen_, const size_t optvallen_,
std::set<T> *const set_) std::set<T> *const set_)
{ {
if (optvallen_ == 0 && optval_ == NULL) { if (optvallen_ == 0 && optval_ == NULL) {
set_->clear (); set_->clear ();
...@@ -592,7 +592,6 @@ int zmq::options_t::setsockopt (int option_, ...@@ -592,7 +592,6 @@ int zmq::options_t::setsockopt (int option_,
case ZMQ_ZAP_DOMAIN: case ZMQ_ZAP_DOMAIN:
return do_setsockopt_string_allow_empty_relaxed ( return do_setsockopt_string_allow_empty_relaxed (
optval_, optvallen_, &zap_domain, UCHAR_MAX); optval_, optvallen_, &zap_domain, UCHAR_MAX);
break;
// If curve encryption isn't built, these options provoke EINVAL // If curve encryption isn't built, these options provoke EINVAL
#ifdef ZMQ_HAVE_CURVE #ifdef ZMQ_HAVE_CURVE
...@@ -869,7 +868,6 @@ int zmq::options_t::getsockopt (int option_, ...@@ -869,7 +868,6 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_ROUTING_ID: case ZMQ_ROUTING_ID:
return do_getsockopt (optval_, optvallen_, routing_id, return do_getsockopt (optval_, optvallen_, routing_id,
routing_id_size); routing_id_size);
break;
case ZMQ_RATE: case ZMQ_RATE:
if (is_int) { if (is_int) {
...@@ -1014,15 +1012,12 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1014,15 +1012,12 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_SOCKS_PROXY: case ZMQ_SOCKS_PROXY:
return do_getsockopt (optval_, optvallen_, socks_proxy_address); return do_getsockopt (optval_, optvallen_, socks_proxy_address);
break;
case ZMQ_SOCKS_USERNAME: case ZMQ_SOCKS_USERNAME:
return do_getsockopt (optval_, optvallen_, socks_proxy_username); return do_getsockopt (optval_, optvallen_, socks_proxy_username);
break;
case ZMQ_SOCKS_PASSWORD: case ZMQ_SOCKS_PASSWORD:
return do_getsockopt (optval_, optvallen_, socks_proxy_password); return do_getsockopt (optval_, optvallen_, socks_proxy_password);
break;
case ZMQ_TCP_KEEPALIVE: case ZMQ_TCP_KEEPALIVE:
if (is_int) { if (is_int) {
...@@ -1068,15 +1063,12 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1068,15 +1063,12 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_PLAIN_USERNAME: case ZMQ_PLAIN_USERNAME:
return do_getsockopt (optval_, optvallen_, plain_username); return do_getsockopt (optval_, optvallen_, plain_username);
break;
case ZMQ_PLAIN_PASSWORD: case ZMQ_PLAIN_PASSWORD:
return do_getsockopt (optval_, optvallen_, plain_password); return do_getsockopt (optval_, optvallen_, plain_password);
break;
case ZMQ_ZAP_DOMAIN: case ZMQ_ZAP_DOMAIN:
return do_getsockopt (optval_, optvallen_, zap_domain); return do_getsockopt (optval_, optvallen_, zap_domain);
break;
// If curve encryption isn't built, these options provoke EINVAL // If curve encryption isn't built, these options provoke EINVAL
#ifdef ZMQ_HAVE_CURVE #ifdef ZMQ_HAVE_CURVE
...@@ -1090,17 +1082,14 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1090,17 +1082,14 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_CURVE_PUBLICKEY: case ZMQ_CURVE_PUBLICKEY:
return do_getsockopt_curve_key (optval_, optvallen_, return do_getsockopt_curve_key (optval_, optvallen_,
curve_public_key); curve_public_key);
break;
case ZMQ_CURVE_SECRETKEY: case ZMQ_CURVE_SECRETKEY:
return do_getsockopt_curve_key (optval_, optvallen_, return do_getsockopt_curve_key (optval_, optvallen_,
curve_secret_key); curve_secret_key);
break;
case ZMQ_CURVE_SERVERKEY: case ZMQ_CURVE_SERVERKEY:
return do_getsockopt_curve_key (optval_, optvallen_, return do_getsockopt_curve_key (optval_, optvallen_,
curve_server_key); curve_server_key);
break;
#endif #endif
case ZMQ_CONFLATE: case ZMQ_CONFLATE:
...@@ -1121,11 +1110,9 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1121,11 +1110,9 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_GSSAPI_PRINCIPAL: case ZMQ_GSSAPI_PRINCIPAL:
return do_getsockopt (optval_, optvallen_, gss_principal); return do_getsockopt (optval_, optvallen_, gss_principal);
break;
case ZMQ_GSSAPI_SERVICE_PRINCIPAL: case ZMQ_GSSAPI_SERVICE_PRINCIPAL:
return do_getsockopt (optval_, optvallen_, gss_service_principal); return do_getsockopt (optval_, optvallen_, gss_service_principal);
break;
case ZMQ_GSSAPI_PLAINTEXT: case ZMQ_GSSAPI_PLAINTEXT:
if (is_int) { if (is_int) {
...@@ -1193,7 +1180,6 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1193,7 +1180,6 @@ int zmq::options_t::getsockopt (int option_,
case ZMQ_BINDTODEVICE: case ZMQ_BINDTODEVICE:
return do_getsockopt (optval_, optvallen_, bound_device); return do_getsockopt (optval_, optvallen_, bound_device);
break;
case ZMQ_ZAP_ENFORCE_DOMAIN: case ZMQ_ZAP_ENFORCE_DOMAIN:
if (is_int) { if (is_int) {
......
...@@ -89,9 +89,8 @@ typedef struct ...@@ -89,9 +89,8 @@ typedef struct
// Utility functions // Utility functions
int capture (class zmq::socket_base_t *capture_, static int
zmq::msg_t *msg_, capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
int more_ = 0)
{ {
// Copy message to capture socket if any // Copy message to capture socket if any
if (capture_) { if (capture_) {
...@@ -109,12 +108,12 @@ int capture (class zmq::socket_base_t *capture_, ...@@ -109,12 +108,12 @@ int capture (class zmq::socket_base_t *capture_,
return 0; return 0;
} }
int forward (class zmq::socket_base_t *from_, static int forward (class zmq::socket_base_t *from_,
zmq_socket_stats_t *from_stats_, zmq_socket_stats_t *from_stats_,
class zmq::socket_base_t *to_, class zmq::socket_base_t *to_,
zmq_socket_stats_t *to_stats_, zmq_socket_stats_t *to_stats_,
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t *msg_) zmq::msg_t *msg_)
{ {
// Forward a burst of messages // Forward a burst of messages
for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
...@@ -184,9 +183,9 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_, ...@@ -184,9 +183,9 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
return rc; return rc;
} }
int reply_stats (class zmq::socket_base_t *control_, static int reply_stats (class zmq::socket_base_t *control_,
zmq_socket_stats_t *frontend_stats_, zmq_socket_stats_t *frontend_stats_,
zmq_socket_stats_t *backend_stats_) zmq_socket_stats_t *backend_stats_)
{ {
// first part: frontend stats - the first send might fail due to HWM // first part: frontend stats - the first send might fail due to HWM
if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true, if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true,
......
...@@ -596,14 +596,11 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, ...@@ -596,14 +596,11 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
static_cast<int> (std::min<uint64_t> (end - now, INT_MAX)); static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
// Wait for events. // Wait for events.
while (true) { int rc = poll (_pollfds, _pollset_size, timeout);
int rc = poll (_pollfds, _pollset_size, timeout); if (rc == -1 && errno == EINTR) {
if (rc == -1 && errno == EINTR) { return -1;
return -1;
}
errno_assert (rc >= 0);
break;
} }
errno_assert (rc >= 0);
// Receive the signal from pollfd // Receive the signal from pollfd
if (_use_signaler && _pollfds[0].revents & POLLIN) if (_use_signaler && _pollfds[0].revents & POLLIN)
...@@ -653,29 +650,26 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, ...@@ -653,29 +650,26 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
} }
// Wait for events. Ignore interrupts if there's infinite timeout. // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) { memcpy (inset.get (), _pollset_in.get (),
memcpy (inset.get (), _pollset_in.get (), valid_pollset_bytes (*_pollset_in.get ()));
valid_pollset_bytes (*_pollset_in.get ())); memcpy (outset.get (), _pollset_out.get (),
memcpy (outset.get (), _pollset_out.get (), valid_pollset_bytes (*_pollset_out.get ()));
valid_pollset_bytes (*_pollset_out.get ())); memcpy (errset.get (), _pollset_err.get (),
memcpy (errset.get (), _pollset_err.get (), valid_pollset_bytes (*_pollset_err.get ()));
valid_pollset_bytes (*_pollset_err.get ())); const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
const int rc = select (static_cast<int> (_max_fd + 1), inset.get (), outset.get (), errset.get (), ptimeout);
outset.get (), errset.get (), ptimeout);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
if (unlikely (rc == SOCKET_ERROR)) { if (unlikely (rc == SOCKET_ERROR)) {
errno = wsa_error_to_errno (WSAGetLastError ()); errno = wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK); wsa_assert (errno == ENOTSOCK);
return -1; return -1;
} }
#else #else
if (unlikely (rc == -1)) { if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF); errno_assert (errno == EINTR || errno == EBADF);
return -1; return -1;
}
#endif
break;
} }
#endif
if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ())) if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
_signaler->recv (); _signaler->recv ();
......
...@@ -884,11 +884,11 @@ encode_base64 (const unsigned char *in_, int in_len_, char *out_, int out_len_) ...@@ -884,11 +884,11 @@ encode_base64 (const unsigned char *in_, int in_len_, char *out_, int out_len_)
static const unsigned char base64enc_tab[65] = static const unsigned char base64enc_tab[65] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
int ii, io; int io = 0;
uint32_t v; uint32_t v = 0;
int rem; int rem = 0;
for (io = 0, ii = 0, v = 0, rem = 0; ii < in_len_; ii++) { for (int ii = 0; ii < in_len_; ii++) {
unsigned char ch; unsigned char ch;
ch = in_[ii]; ch = in_[ii];
v = (v << 8) | ch; v = (v << 8) | ch;
......
...@@ -726,7 +726,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) ...@@ -726,7 +726,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
// Polling. // Polling.
#if defined ZMQ_HAVE_POLLER #if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) static int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{ {
// implement zmq_poll on top of zmq_poller // implement zmq_poll on top of zmq_poller
int rc; int rc;
......
...@@ -333,5 +333,4 @@ int main (void) ...@@ -333,5 +333,4 @@ int main (void)
RUN_TEST (test_vanilla_socket); RUN_TEST (test_vanilla_socket);
RUN_TEST (test_unauth_creds); RUN_TEST (test_unauth_creds);
return UNITY_END (); return UNITY_END ();
return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment