Unverified Commit 9544dade authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2926 from sigiesec/fix-issue-2925

Problem: assertion failure in poller_base.cpp:42
parents a1d55d05 50d80d08
...@@ -541,8 +541,14 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p); ...@@ -541,8 +541,14 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p);
/* Starts the stopwatch. Returns the handle to the watch. */ /* Starts the stopwatch. Returns the handle to the watch. */
ZMQ_EXPORT void *zmq_stopwatch_start (void); ZMQ_EXPORT void *zmq_stopwatch_start (void);
#ifdef ZMQ_BUILD_DRAFT_API
/* Returns the number of microseconds elapsed since the stopwatch was */
/* started, but does not stop or deallocate the stopwatch. */
ZMQ_EXPORT unsigned long zmq_stopwatch_intermediate (void *watch_);
#endif
/* Stops the stopwatch. Returns the number of microseconds elapsed since */ /* Stops the stopwatch. Returns the number of microseconds elapsed since */
/* the stopwatch was started. */ /* the stopwatch was started, and deallocates that watch. */
ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
/* Sleeps for specified number of seconds. */ /* Sleeps for specified number of seconds. */
......
...@@ -42,7 +42,7 @@ zmq::poller_base_t::~poller_base_t () ...@@ -42,7 +42,7 @@ zmq::poller_base_t::~poller_base_t ()
zmq_assert (get_load () == 0); zmq_assert (get_load () == 0);
} }
int zmq::poller_base_t::get_load () int zmq::poller_base_t::get_load () const
{ {
return load.get (); return load.get ();
} }
......
...@@ -47,7 +47,7 @@ class poller_base_t ...@@ -47,7 +47,7 @@ class poller_base_t
// Returns load of the poller. Note that this function can be // Returns load of the poller. Note that this function can be
// invoked from a different thread! // invoked from a different thread!
int get_load (); int get_load () const;
// Add a timeout to expire in timeout_ milliseconds. After the // Add a timeout to expire in timeout_ milliseconds. After the
// expiration timer_event on sink_ object will be called with // expiration timer_event on sink_ object will be called with
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
#include <algorithm>
zmq::select_t::select_t (const zmq::ctx_t &ctx_) : zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
ctx (ctx_), ctx (ctx_),
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
...@@ -70,10 +72,13 @@ zmq::select_t::~select_t () ...@@ -70,10 +72,13 @@ zmq::select_t::~select_t ()
stop (); stop ();
worker.stop (); worker.stop ();
} }
zmq_assert (get_load () == 0);
} }
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
zmq_assert (fd_ != retired_fd);
fd_entry_t fd_entry; fd_entry_t fd_entry;
fd_entry.fd = fd_; fd_entry.fd = fd_;
fd_entry.events = events_; fd_entry.events = events_;
...@@ -150,15 +155,20 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_, ...@@ -150,15 +155,20 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
} }
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
bool zmq::select_t::try_remove_fd_entry ( int zmq::select_t::try_retire_fd_entry (
family_entries_t::iterator family_entry_it, zmq::fd_t &handle_) family_entries_t::iterator family_entry_it, zmq::fd_t &handle_)
{ {
family_entry_t &family_entry = family_entry_it->second; family_entry_t &family_entry = family_entry_it->second;
fd_entries_t::iterator fd_entry_it = fd_entries_t::iterator fd_entry_it =
find_fd_entry_by_handle (family_entry.fd_entries, handle_); find_fd_entry_by_handle (family_entry.fd_entries, handle_);
if (fd_entry_it == family_entry.fd_entries.end ()) if (fd_entry_it == family_entry.fd_entries.end ())
return false; return 0;
fd_entry_t &fd_entry = *fd_entry_it;
zmq_assert (fd_entry.fd != retired_fd);
if (family_entry_it != current_family_entry_it) { if (family_entry_it != current_family_entry_it) {
// Family is not currently being iterated and can be safely // Family is not currently being iterated and can be safely
// modified in-place. So later it can be skipped without // modified in-place. So later it can be skipped without
...@@ -167,11 +177,11 @@ bool zmq::select_t::try_remove_fd_entry ( ...@@ -167,11 +177,11 @@ bool zmq::select_t::try_remove_fd_entry (
} else { } else {
// Otherwise mark removed entries as retired. It will be cleaned up // Otherwise mark removed entries as retired. It will be cleaned up
// at the end of the iteration. See zmq::select_t::loop // at the end of the iteration. See zmq::select_t::loop
fd_entry_it->fd = retired_fd; fd_entry.fd = retired_fd;
family_entry.retired = true; family_entry.has_retired = true;
} }
family_entry.fds_set.remove_fd (handle_); family_entry.fds_set.remove_fd (handle_);
return true; return 1;
} }
#endif #endif
...@@ -179,12 +189,12 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -179,12 +189,12 @@ void zmq::select_t::rm_fd (handle_t handle_)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
int retired = 0;
if (family != AF_UNSPEC) { if (family != AF_UNSPEC) {
family_entries_t::iterator family_entry_it = family_entries_t::iterator family_entry_it =
family_entries.find (family); family_entries.find (family);
int removed = try_remove_fd_entry (family_entry_it, handle_); retired += try_retire_fd_entry (family_entry_it, handle_);
assert (removed);
} else { } else {
// get_fd_family may fail and return AF_UNSPEC if the socket was not // get_fd_family may fail and return AF_UNSPEC if the socket was not
// successfully connected. In that case, we need to look for the // successfully connected. In that case, we need to look for the
...@@ -193,8 +203,9 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -193,8 +203,9 @@ void zmq::select_t::rm_fd (handle_t handle_)
for (family_entries_t::iterator family_entry_it = for (family_entries_t::iterator family_entry_it =
family_entries.begin (); family_entries.begin ();
family_entry_it != end; ++family_entry_it) { family_entry_it != end; ++family_entry_it) {
if (try_remove_fd_entry (family_entry_it, handle_)) if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
break; break;
}
} }
} }
#else #else
...@@ -202,9 +213,12 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -202,9 +213,12 @@ void zmq::select_t::rm_fd (handle_t handle_)
find_fd_entry_by_handle (family_entry.fd_entries, handle_); find_fd_entry_by_handle (family_entry.fd_entries, handle_);
assert (fd_entry_it != fd_entries.end ()); assert (fd_entry_it != fd_entries.end ());
zmq_assert (fd_entry_it->fd != retired_fd);
fd_entry_it->fd = retired_fd; fd_entry_it->fd = retired_fd;
family_entry.fds_set.remove_fd (handle_); family_entry.fds_set.remove_fd (handle_);
++retired;
if (handle_ == maxfd) { if (handle_ == maxfd) {
maxfd = retired_fd; maxfd = retired_fd;
for (fd_entry_it = family_entry.fd_entries.begin (); for (fd_entry_it = family_entry.fd_entries.begin ();
...@@ -213,8 +227,9 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -213,8 +227,9 @@ void zmq::select_t::rm_fd (handle_t handle_)
maxfd = fd_entry_it->fd; maxfd = fd_entry_it->fd;
} }
family_entry.retired = true; family_entry.has_retired = true;
#endif #endif
zmq_assert (retired == 1);
adjust_load (-1); adjust_load (-1);
} }
...@@ -274,11 +289,30 @@ int zmq::select_t::max_fds () ...@@ -274,11 +289,30 @@ int zmq::select_t::max_fds ()
return FD_SETSIZE; return FD_SETSIZE;
} }
// TODO should this be configurable?
const int max_shutdown_timeout = 250;
void zmq::select_t::loop () void zmq::select_t::loop ()
{ {
while (!stopping) { void *stopwatch = NULL;
while (!stopwatch || get_load ()) {
int max_timeout = INT_MAX;
if (stopping) {
if (stopwatch) {
max_timeout = max_shutdown_timeout
- (int) zmq_stopwatch_intermediate (stopwatch);
// bail out eventually, when max_shutdown_timeout has reached,
// to avoid spinning forever in case of some error
zmq_assert (max_timeout > 0);
} else {
stopwatch = zmq_stopwatch_start ();
max_timeout = max_shutdown_timeout;
}
}
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = std::min ((int) execute_timers (), max_timeout);
#if defined ZMQ_HAVE_OSX #if defined ZMQ_HAVE_OSX
struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000}; struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
...@@ -379,6 +413,7 @@ void zmq::select_t::loop () ...@@ -379,6 +413,7 @@ void zmq::select_t::loop ()
select_family_entry (family_entry, maxfd, timeout > 0, tv); select_family_entry (family_entry, maxfd, timeout > 0, tv);
#endif #endif
} }
zmq_stopwatch_stop (stopwatch);
} }
void zmq::select_t::select_family_entry (family_entry_t &family_entry_, void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
...@@ -406,8 +441,8 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_, ...@@ -406,8 +441,8 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
trigger_events (fd_entries, local_fds_set, rc); trigger_events (fd_entries, local_fds_set, rc);
if (family_entry_.retired) { if (family_entry_.has_retired) {
family_entry_.retired = false; family_entry_.has_retired = false;
family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (), family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (),
fd_entries.end (), fd_entries.end (),
is_retired_fd), is_retired_fd),
...@@ -543,7 +578,7 @@ u_short zmq::select_t::determine_fd_family (fd_t fd_) ...@@ -543,7 +578,7 @@ u_short zmq::select_t::determine_fd_family (fd_t fd_)
return AF_UNSPEC; return AF_UNSPEC;
} }
zmq::select_t::family_entry_t::family_entry_t () : retired (false) zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
{ {
} }
......
...@@ -119,7 +119,7 @@ class select_t : public poller_base_t ...@@ -119,7 +119,7 @@ class select_t : public poller_base_t
fd_entries_t fd_entries; fd_entries_t fd_entries;
fds_set_t fds_set; fds_set_t fds_set;
bool retired; bool has_retired;
}; };
void select_family_entry (family_entry_t &family_entry_, void select_family_entry (family_entry_t &family_entry_,
...@@ -143,8 +143,8 @@ class select_t : public poller_base_t ...@@ -143,8 +143,8 @@ class select_t : public poller_base_t
// See loop for details. // See loop for details.
family_entries_t::iterator current_family_entry_it; family_entries_t::iterator current_family_entry_it;
bool try_remove_fd_entry (family_entries_t::iterator family_entry_it, int try_retire_fd_entry (family_entries_t::iterator family_entry_it,
zmq::fd_t &handle_); zmq::fd_t &handle_);
static const size_t fd_family_cache_size = 8; static const size_t fd_family_cache_size = 8;
std::pair<fd_t, u_short> fd_family_cache[fd_family_cache_size]; std::pair<fd_t, u_short> fd_family_cache[fd_family_cache_size];
......
...@@ -37,6 +37,10 @@ ...@@ -37,6 +37,10 @@
#ifndef ZMQ_BUILD_DRAFT_API #ifndef ZMQ_BUILD_DRAFT_API
/* Returns the number of microseconds elapsed since the stopwatch was */
/* started, but does not stop or deallocate the stopwatch. */
unsigned long zmq_stopwatch_intermediate (void *watch_);
/* DRAFT Socket types. */ /* DRAFT Socket types. */
#define ZMQ_SERVER 12 #define ZMQ_SERVER 12
#define ZMQ_CLIENT 13 #define ZMQ_CLIENT 13
......
...@@ -66,14 +66,20 @@ void *zmq_stopwatch_start () ...@@ -66,14 +66,20 @@ void *zmq_stopwatch_start ()
return (void *) watch; return (void *) watch;
} }
unsigned long zmq_stopwatch_stop (void *watch_) unsigned long zmq_stopwatch_intermediate (void *watch_)
{ {
uint64_t end = zmq::clock_t::now_us (); uint64_t end = zmq::clock_t::now_us ();
uint64_t start = *(uint64_t *) watch_; uint64_t start = *(uint64_t *) watch_;
free (watch_);
return (unsigned long) (end - start); return (unsigned long) (end - start);
} }
unsigned long zmq_stopwatch_stop (void *watch_)
{
unsigned long res = zmq_stopwatch_intermediate (watch_);
free (watch_);
return res;
}
void *zmq_threadstart (zmq_thread_fn *func, void *arg) void *zmq_threadstart (zmq_thread_fn *func, void *arg)
{ {
zmq::thread_t *thread = new (std::nothrow) zmq::thread_t; zmq::thread_t *thread = new (std::nothrow) zmq::thread_t;
......
...@@ -47,7 +47,8 @@ int main (void) ...@@ -47,7 +47,8 @@ int main (void)
assert (zmq_errno () == EAGAIN); assert (zmq_errno () == EAGAIN);
// Check whether receive timeout is honored // Check whether receive timeout is honored
int timeout = 250; const int timeout = 250;
const int jitter = 50;
rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int)); rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0); assert (rc == 0);
...@@ -56,7 +57,14 @@ int main (void) ...@@ -56,7 +57,14 @@ int main (void)
assert (rc == -1); assert (rc == -1);
assert (zmq_errno () == EAGAIN); assert (zmq_errno () == EAGAIN);
unsigned int elapsed = zmq_stopwatch_stop (stopwatch) / 1000; unsigned int elapsed = zmq_stopwatch_stop (stopwatch) / 1000;
assert (elapsed > 200 && elapsed < 300); assert (elapsed > timeout - jitter);
if (elapsed >= timeout + jitter) {
// we cannot assert this on a non-RT system
fprintf (stderr,
"zmq_recv took quite long, with a timeout of %i ms, it took "
"actually %i ms\n",
timeout, elapsed);
}
// Check that normal message flow works as expected // Check that normal message flow works as expected
void *backend = zmq_socket (ctx, ZMQ_DEALER); void *backend = zmq_socket (ctx, ZMQ_DEALER);
......
...@@ -154,13 +154,22 @@ int main (void) ...@@ -154,13 +154,22 @@ int main (void)
bool timer_invoked = false; bool timer_invoked = false;
int timer_id = zmq_timers_add (timers, 100, handler, &timer_invoked); const int full_timeout = 100;
void *const stopwatch = zmq_stopwatch_start ();
int timer_id =
zmq_timers_add (timers, full_timeout, handler, &timer_invoked);
assert (timer_id); assert (timer_id);
// Timer should be invoked yet // Timer should not have been invoked yet
int rc = zmq_timers_execute (timers); int rc = zmq_timers_execute (timers);
assert (rc == 0); assert (rc == 0);
assert (!timer_invoked);
#ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) {
assert (!timer_invoked);
}
#endif
// Wait half the time and check again // Wait half the time and check again
long timeout = zmq_timers_timeout (timers); long timeout = zmq_timers_timeout (timers);
...@@ -168,7 +177,11 @@ int main (void) ...@@ -168,7 +177,11 @@ int main (void)
msleep (timeout / 2); msleep (timeout / 2);
rc = zmq_timers_execute (timers); rc = zmq_timers_execute (timers);
assert (rc == 0); assert (rc == 0);
assert (!timer_invoked); #ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) {
assert (!timer_invoked);
}
#endif
// Wait until the end // Wait until the end
rc = sleep_and_execute (timers); rc = sleep_and_execute (timers);
...@@ -182,7 +195,11 @@ int main (void) ...@@ -182,7 +195,11 @@ int main (void)
msleep (timeout / 2); msleep (timeout / 2);
rc = zmq_timers_execute (timers); rc = zmq_timers_execute (timers);
assert (rc == 0); assert (rc == 0);
assert (!timer_invoked); #ifdef ZMQ_BUILD_DRAFT_API
if (zmq_stopwatch_intermediate (stopwatch) < 2 * full_timeout) {
assert (!timer_invoked);
}
#endif
// Reset timer and wait half of the time left // Reset timer and wait half of the time left
rc = zmq_timers_reset (timers, timer_id); rc = zmq_timers_reset (timers, timer_id);
...@@ -190,7 +207,9 @@ int main (void) ...@@ -190,7 +207,9 @@ int main (void)
msleep (timeout / 2); msleep (timeout / 2);
rc = zmq_timers_execute (timers); rc = zmq_timers_execute (timers);
assert (rc == 0); assert (rc == 0);
assert (!timer_invoked); if (zmq_stopwatch_stop (stopwatch) < 2 * full_timeout) {
assert (!timer_invoked);
}
// Wait until the end // Wait until the end
rc = sleep_and_execute (timers); rc = sleep_and_execute (timers);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment