Unverified Commit d9ade476 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3202 from sigiesec/code-improvements

Simplified and optimized several code fragments
parents da8024d3 19a70ab6
...@@ -448,10 +448,7 @@ int zmq::thread_ctx_t::set (int option_, int optval_) ...@@ -448,10 +448,7 @@ int zmq::thread_ctx_t::set (int option_, int optval_)
_thread_affinity_cpus.insert (optval_); _thread_affinity_cpus.insert (optval_);
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) { } else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker (_opt_sync); scoped_lock_t locker (_opt_sync);
std::set<int>::iterator it = _thread_affinity_cpus.find (optval_); if (0 == _thread_affinity_cpus.erase (optval_)) {
if (it != _thread_affinity_cpus.end ()) {
_thread_affinity_cpus.erase (it);
} else {
errno = EINVAL; errno = EINVAL;
rc = -1; rc = -1;
} }
...@@ -531,15 +528,16 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) ...@@ -531,15 +528,16 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
{ {
scoped_lock_t locker (_endpoints_sync); scoped_lock_t locker (_endpoints_sync);
endpoints_t::iterator it = _endpoints.begin (); for (endpoints_t::iterator it = _endpoints.begin ();
while (it != _endpoints.end ()) { it != _endpoints.end ();) {
if (it->second.socket == socket_) { if (it->second.socket == socket_)
endpoints_t::iterator to_erase = it; #if __cplusplus >= 201103L
it = _endpoints.erase (it);
#else
_endpoints.erase (it++);
#endif
else
++it; ++it;
_endpoints.erase (to_erase);
continue;
}
++it;
} }
} }
......
...@@ -137,16 +137,11 @@ int zmq::dish_t::xleave (const char *group_) ...@@ -137,16 +137,11 @@ int zmq::dish_t::xleave (const char *group_)
return -1; return -1;
} }
subscriptions_t::iterator it = if (0 == _subscriptions.erase (group)) {
std::find (_subscriptions.begin (), _subscriptions.end (), group);
if (it == _subscriptions.end ()) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
_subscriptions.erase (it);
msg_t msg; msg_t msg;
int rc = msg.init_leave (); int rc = msg.init_leave ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -183,27 +178,31 @@ int zmq::dish_t::xrecv (msg_t *msg_) ...@@ -183,27 +178,31 @@ int zmq::dish_t::xrecv (msg_t *msg_)
// If there's already a message prepared by a previous call to zmq_poll, // If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead. // return it straight ahead.
if (_has_message) { if (_has_message) {
int rc = msg_->move (_message); const int rc = msg_->move (_message);
errno_assert (rc == 0); errno_assert (rc == 0);
_has_message = false; _has_message = false;
return 0; return 0;
} }
while (true) { return xxrecv (msg_);
}
int zmq::dish_t::xxrecv (msg_t *msg_)
{
do {
// Get a message using fair queueing algorithm. // Get a message using fair queueing algorithm.
int rc = _fq.recv (msg_); const int rc = _fq.recv (msg_);
// If there's no message available, return immediately. // If there's no message available, return immediately.
// The same when error occurs. // The same when error occurs.
if (rc != 0) if (rc != 0)
return -1; return -1;
// Filtering non matching messages // Skip non matching messages
subscriptions_t::iterator it = } while (0 == _subscriptions.count (std::string (msg_->group ())));
_subscriptions.find (std::string (msg_->group ()));
if (it != _subscriptions.end ()) // Found a matching message
return 0; return 0;
}
} }
bool zmq::dish_t::xhas_in () bool zmq::dish_t::xhas_in ()
...@@ -213,25 +212,15 @@ bool zmq::dish_t::xhas_in () ...@@ -213,25 +212,15 @@ bool zmq::dish_t::xhas_in ()
if (_has_message) if (_has_message)
return true; return true;
while (true) { const int rc = xxrecv (&_message);
// Get a message using fair queueing algorithm. if (rc != 0) {
int rc = _fq.recv (&_message); errno_assert (errno == EAGAIN);
return false;
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
errno_assert (errno == EAGAIN);
return false;
}
// Filtering non matching messages
subscriptions_t::iterator it =
_subscriptions.find (std::string (_message.group ()));
if (it != _subscriptions.end ()) {
_has_message = true;
return true;
}
} }
// Matching message found
_has_message = true;
return true;
} }
const zmq::blob_t &zmq::dish_t::get_credential () const const zmq::blob_t &zmq::dish_t::get_credential () const
......
...@@ -68,6 +68,8 @@ class dish_t : public socket_base_t ...@@ -68,6 +68,8 @@ class dish_t : public socket_base_t
int xleave (const char *group_); int xleave (const char *group_);
private: private:
int xxrecv (zmq::msg_t *msg_);
// Send subscriptions to a pipe // Send subscriptions to a pipe
void send_subscriptions (pipe_t *pipe_); void send_subscriptions (pipe_t *pipe_);
......
...@@ -604,8 +604,7 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) ...@@ -604,8 +604,7 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_)
const int max_attempts = 10; const int max_attempts = 10;
int iterations = 0; int iterations = 0;
IP_ADAPTER_ADDRESSES *addresses = NULL; IP_ADAPTER_ADDRESSES *addresses;
IP_ADAPTER_ADDRESSES *current_addresses = NULL;
unsigned long out_buf_len = sizeof (IP_ADAPTER_ADDRESSES); unsigned long out_buf_len = sizeof (IP_ADAPTER_ADDRESSES);
do { do {
...@@ -627,27 +626,25 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) ...@@ -627,27 +626,25 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_)
} while ((rc == ERROR_BUFFER_OVERFLOW) && (iterations < max_attempts)); } while ((rc == ERROR_BUFFER_OVERFLOW) && (iterations < max_attempts));
if (rc == 0) { if (rc == 0) {
current_addresses = addresses; for (const IP_ADAPTER_ADDRESSES *current_addresses = addresses;
while (current_addresses) { current_addresses; current_addresses = current_addresses->Next) {
char *if_name = NULL; char *if_name = NULL;
char *if_friendly_name = NULL; char *if_friendly_name = NULL;
int str_rc1, str_rc2;
str_rc1 = get_interface_name (current_addresses->IfIndex, &if_name); const int str_rc1 =
str_rc2 = wchar_to_utf8 (current_addresses->FriendlyName, get_interface_name (current_addresses->IfIndex, &if_name);
&if_friendly_name); const int str_rc2 = wchar_to_utf8 (current_addresses->FriendlyName,
&if_friendly_name);
// Find a network adapter by its "name" or "friendly name" // Find a network adapter by its "name" or "friendly name"
if (((str_rc1 == 0) && (!strcmp (nic_, if_name))) if (((str_rc1 == 0) && (!strcmp (nic_, if_name)))
|| ((str_rc2 == 0) && (!strcmp (nic_, if_friendly_name)))) { || ((str_rc2 == 0) && (!strcmp (nic_, if_friendly_name)))) {
// Iterate over all unicast addresses bound to the current network interface // Iterate over all unicast addresses bound to the current network interface
IP_ADAPTER_UNICAST_ADDRESS *unicast_address = for (const IP_ADAPTER_UNICAST_ADDRESS *current_unicast_address =
current_addresses->FirstUnicastAddress; current_addresses->FirstUnicastAddress;
IP_ADAPTER_UNICAST_ADDRESS *current_unicast_address = current_unicast_address;
unicast_address; current_unicast_address = current_unicast_address->Next) {
const ADDRESS_FAMILY family =
while (current_unicast_address) {
ADDRESS_FAMILY family =
current_unicast_address->Address.lpSockaddr->sa_family; current_unicast_address->Address.lpSockaddr->sa_family;
if (family == (_options.ipv6 () ? AF_INET6 : AF_INET)) { if (family == (_options.ipv6 () ? AF_INET6 : AF_INET)) {
...@@ -658,8 +655,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) ...@@ -658,8 +655,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_)
found = true; found = true;
break; break;
} }
current_unicast_address = current_unicast_address->Next;
} }
if (found) if (found)
...@@ -670,8 +665,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) ...@@ -670,8 +665,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_)
free (if_name); free (if_name);
if (str_rc2 == 0) if (str_rc2 == 0)
free (if_friendly_name); free (if_friendly_name);
current_addresses = current_addresses->Next;
} }
free (addresses); free (addresses);
......
...@@ -32,6 +32,8 @@ ...@@ -32,6 +32,8 @@
#include "clock.hpp" #include "clock.hpp"
#include "err.hpp" #include "err.hpp"
#include <algorithm>
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_) zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_)
{ {
// Get the pipe into passive state. That way, if the users starts by // Get the pipe into passive state. That way, if the users starts by
...@@ -58,13 +60,9 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_) ...@@ -58,13 +60,9 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_) void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
{ {
std::vector<signaler_t *>::iterator it = _signalers.begin ();
// TODO: make a copy of array and signal outside the lock // TODO: make a copy of array and signal outside the lock
for (; it != _signalers.end (); ++it) { std::vector<signaler_t *>::iterator it =
if (*it == signaler_) std::find (_signalers.begin (), _signalers.end (), signaler_);
break;
}
if (it != _signalers.end ()) if (it != _signalers.end ())
_signalers.erase (it); _signalers.erase (it);
......
...@@ -102,15 +102,12 @@ void zmq::own_t::process_term_req (own_t *object_) ...@@ -102,15 +102,12 @@ void zmq::own_t::process_term_req (own_t *object_)
if (_terminating) if (_terminating)
return; return;
// If I/O object is well and alive let's ask it to terminate.
owned_t::iterator it = std::find (_owned.begin (), _owned.end (), object_);
// If not found, we assume that termination request was already sent to // If not found, we assume that termination request was already sent to
// the object so we can safely ignore the request. // the object so we can safely ignore the request.
if (it == _owned.end ()) if (0 == _owned.erase (object_))
return; return;
_owned.erase (it); // If I/O object is well and alive let's ask it to terminate.
register_term_acks (1); register_term_acks (1);
// Note that this object is the root of the (partial shutdown) thus, its // Note that this object is the root of the (partial shutdown) thus, its
......
...@@ -82,29 +82,32 @@ uint64_t zmq::poller_base_t::execute_timers () ...@@ -82,29 +82,32 @@ uint64_t zmq::poller_base_t::execute_timers ()
return 0; return 0;
// Get the current time. // Get the current time.
uint64_t current = _clock.now_ms (); const uint64_t current = _clock.now_ms ();
// Execute the timers that are already due. // Execute the timers that are already due.
timers_t::iterator it = _timers.begin (); const timers_t::iterator begin = _timers.begin ();
while (it != _timers.end ()) { const timers_t::iterator end = _timers.end ();
uint64_t res = 0;
timers_t::iterator it = begin;
for (; it != end; ++it) {
// If we have to wait to execute the item, same will be true about // If we have to wait to execute the item, same will be true about
// all the following items (multimap is sorted). Thus we can stop // all the following items (multimap is sorted). Thus we can stop
// checking the subsequent timers and return the time to wait for // checking the subsequent timers.
// the next timer (at least 1ms). if (it->first > current) {
if (it->first > current) res = it->first - current;
return it->first - current; break;
}
// Trigger the timer. // Trigger the timer.
it->second.sink->timer_event (it->second.id); it->second.sink->timer_event (it->second.id);
// Remove it from the list of active timers.
timers_t::iterator o = it;
++it;
_timers.erase (o);
} }
// There are no more timers. // Remove them from the list of active timers.
return 0; _timers.erase (begin, it);
// Return the time to wait for the next timer (at least 1ms), or 0, if
// there are no more timers.
return res;
} }
zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) : zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) :
......
...@@ -122,12 +122,14 @@ int zmq::radio_t::xsetsockopt (int option_, ...@@ -122,12 +122,14 @@ int zmq::radio_t::xsetsockopt (int option_,
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
{ {
// NOTE: erase invalidates an iterator, and that's why it's not incrementing in post-loop
// read-after-free caught by Valgrind, see https://github.com/zeromq/libzmq/pull/1771
for (subscriptions_t::iterator it = _subscriptions.begin (); for (subscriptions_t::iterator it = _subscriptions.begin ();
it != _subscriptions.end ();) { it != _subscriptions.end ();) {
if (it->second == pipe_) { if (it->second == pipe_) {
#if __cplusplus >= 201103L
it = _subscriptions.erase (it);
#else
_subscriptions.erase (it++); _subscriptions.erase (it++);
#endif
} else { } else {
++it; ++it;
} }
......
...@@ -150,10 +150,7 @@ int zmq::router_t::xsetsockopt (int option_, ...@@ -150,10 +150,7 @@ int zmq::router_t::xsetsockopt (int option_,
void zmq::router_t::xpipe_terminated (pipe_t *pipe_) void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
{ {
std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_); if (0 == _anonymous_pipes.erase (pipe_)) {
if (it != _anonymous_pipes.end ())
_anonymous_pipes.erase (it);
else {
erase_out_pipe (pipe_); erase_out_pipe (pipe_);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
pipe_->rollback (); pipe_->rollback ();
......
...@@ -187,15 +187,10 @@ void zmq::signaler_t::send () ...@@ -187,15 +187,10 @@ void zmq::signaler_t::send ()
errno_assert (sz == sizeof (inc)); errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
unsigned char dummy = 0; unsigned char dummy = 0;
while (true) { const int nbytes =
int nbytes = ::send (_w, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
::send (_w, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0); wsa_assert (nbytes != SOCKET_ERROR);
wsa_assert (nbytes != SOCKET_ERROR); zmq_assert (nbytes == sizeof (dummy));
if (unlikely (nbytes == SOCKET_ERROR))
continue;
zmq_assert (nbytes == sizeof (dummy));
break;
}
#elif defined ZMQ_HAVE_VXWORKS #elif defined ZMQ_HAVE_VXWORKS
unsigned char dummy = 0; unsigned char dummy = 0;
while (true) { while (true) {
......
...@@ -1317,12 +1317,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -1317,12 +1317,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{ {
int rc; if (timeout_ == 0) {
command_t cmd;
if (timeout_ != 0) {
// If we are asked to wait, simply ask mailbox to wait.
rc = _mailbox->recv (&cmd, timeout_);
} else {
// If we are asked not to wait, check whether we haven't processed // If we are asked not to wait, check whether we haven't processed
// commands recently, so that we can throttle the new commands. // commands recently, so that we can throttle the new commands.
...@@ -1343,11 +1338,12 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) ...@@ -1343,11 +1338,12 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
return 0; return 0;
_last_tsc = tsc; _last_tsc = tsc;
} }
// Check whether there are any commands pending for this thread.
rc = _mailbox->recv (&cmd, 0);
} }
// Check whether there are any commands pending for this thread.
command_t cmd;
int rc = _mailbox->recv (&cmd, timeout_);
// Process all available commands. // Process all available commands.
while (rc == 0) { while (rc == 0) {
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
......
...@@ -135,65 +135,50 @@ int zmq::timers_t::reset (int timer_id_) ...@@ -135,65 +135,50 @@ int zmq::timers_t::reset (int timer_id_)
long zmq::timers_t::timeout () long zmq::timers_t::timeout ()
{ {
timersmap_t::iterator it = _timers.begin (); const uint64_t now = _clock.now_ms ();
long res = -1;
uint64_t now = _clock.now_ms ();
while (it != _timers.end ()) {
cancelled_timers_t::iterator cancelled_it =
_cancelled_timers.find (it->second.timer_id);
// Live timer, lets return the timeout
if (cancelled_it == _cancelled_timers.end ()) {
if (it->first > now)
return static_cast<long> (it->first - now);
return 0; const timersmap_t::iterator begin = _timers.begin ();
const timersmap_t::iterator end = _timers.end ();
timersmap_t::iterator it = begin;
for (; it != end; ++it) {
if (0 == _cancelled_timers.erase (it->second.timer_id)) {
// Live timer, lets return the timeout
res = it->first > now ? static_cast<long> (it->first - now) : 0;
break;
} }
// Let's remove it from the beginning of the list
timersmap_t::iterator old = it;
++it;
_timers.erase (old);
_cancelled_timers.erase (cancelled_it);
} }
// Wait forever as no timers are alive // Remove timed-out timers
return -1; _timers.erase (begin, it);
return res;
} }
int zmq::timers_t::execute () int zmq::timers_t::execute ()
{ {
timersmap_t::iterator it = _timers.begin (); const uint64_t now = _clock.now_ms ();
uint64_t now = _clock.now_ms ();
while (it != _timers.end ()) {
cancelled_timers_t::iterator cancelled_it =
_cancelled_timers.find (it->second.timer_id);
// Dead timer, lets remove it and continue const timersmap_t::iterator begin = _timers.begin ();
if (cancelled_it != _cancelled_timers.end ()) { const timersmap_t::iterator end = _timers.end ();
timersmap_t::iterator old = it; timersmap_t::iterator it = _timers.begin ();
++it; for (; it != end; ++it) {
_timers.erase (old); if (0 == _cancelled_timers.erase (it->second.timer_id)) {
_cancelled_timers.erase (cancelled_it); // Timer is not cancelled
continue;
}
// Map is ordered, if we have to wait for current timer we can stop. // Map is ordered, if we have to wait for current timer we can stop.
if (it->first > now) if (it->first > now)
break; break;
timer_t timer = it->second; const timer_t &timer = it->second;
timer.handler (timer.timer_id, timer.arg); timer.handler (timer.timer_id, timer.arg);
timersmap_t::iterator old = it; _timers.insert (
++it; timersmap_t::value_type (now + timer.interval, timer));
_timers.erase (old); }
_timers.insert (timersmap_t::value_type (now + timer.interval, timer));
} }
_timers.erase (begin, it);
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment