Commit 11a53886 authored by Simon Giesecke's avatar Simon Giesecke

Problem: data race w.r.t. select_t::stopping

Solution: change termination condition of select_t
parent e6502405
...@@ -52,15 +52,13 @@ ...@@ -52,15 +52,13 @@
#include <climits> #include <climits>
zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) : zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_),
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
// Fine as long as map is not cleared. // Fine as long as map is not cleared.
current_family_entry_it (family_entries.end ()), current_family_entry_it (family_entries.end ())
#else #else
maxfd (retired_fd), maxfd (retired_fd)
#endif #endif
started (false),
stopping (false)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
for (size_t i = 0; i < fd_family_cache_size; ++i) for (size_t i = 0; i < fd_family_cache_size; ++i)
...@@ -70,15 +68,12 @@ zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) : ...@@ -70,15 +68,12 @@ zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
zmq::select_t::~select_t () zmq::select_t::~select_t ()
{ {
if (started) { stop_worker ();
stop ();
worker.stop ();
}
zmq_assert (get_load () == 0);
} }
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
check_thread ();
zmq_assert (fd_ != retired_fd); zmq_assert (fd_ != retired_fd);
fd_entry_t fd_entry; fd_entry_t fd_entry;
...@@ -189,6 +184,7 @@ int zmq::select_t::try_retire_fd_entry ( ...@@ -189,6 +184,7 @@ int zmq::select_t::try_retire_fd_entry (
void zmq::select_t::rm_fd (handle_t handle_) void zmq::select_t::rm_fd (handle_t handle_)
{ {
check_thread ();
int retired = 0; int retired = 0;
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
...@@ -237,6 +233,7 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -237,6 +233,7 @@ void zmq::select_t::rm_fd (handle_t handle_)
void zmq::select_t::set_pollin (handle_t handle_) void zmq::select_t::set_pollin (handle_t handle_)
{ {
check_thread ();
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
...@@ -247,6 +244,7 @@ void zmq::select_t::set_pollin (handle_t handle_) ...@@ -247,6 +244,7 @@ void zmq::select_t::set_pollin (handle_t handle_)
void zmq::select_t::reset_pollin (handle_t handle_) void zmq::select_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
...@@ -257,6 +255,7 @@ void zmq::select_t::reset_pollin (handle_t handle_) ...@@ -257,6 +255,7 @@ void zmq::select_t::reset_pollin (handle_t handle_)
void zmq::select_t::set_pollout (handle_t handle_) void zmq::select_t::set_pollout (handle_t handle_)
{ {
check_thread ();
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
...@@ -267,6 +266,7 @@ void zmq::select_t::set_pollout (handle_t handle_) ...@@ -267,6 +266,7 @@ void zmq::select_t::set_pollout (handle_t handle_)
void zmq::select_t::reset_pollout (handle_t handle_) void zmq::select_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
...@@ -275,15 +275,10 @@ void zmq::select_t::reset_pollout (handle_t handle_) ...@@ -275,15 +275,10 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR (handle_, &family_entry.fds_set.write); FD_CLR (handle_, &family_entry.fds_set.write);
} }
void zmq::select_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
started = true;
}
void zmq::select_t::stop () void zmq::select_t::stop ()
{ {
stopping = true; check_thread ();
// no-op... thread is stopped when no more fds or timers are registered
} }
int zmq::select_t::max_fds () int zmq::select_t::max_fds ()
...@@ -291,30 +286,27 @@ int zmq::select_t::max_fds () ...@@ -291,30 +286,27 @@ int zmq::select_t::max_fds ()
return FD_SETSIZE; return FD_SETSIZE;
} }
// TODO should this be configurable?
const int max_shutdown_timeout = 250;
void zmq::select_t::loop () void zmq::select_t::loop ()
{ {
void *stopwatch = NULL; while (true) {
while (!stopwatch || get_load ()) {
int max_timeout = INT_MAX;
if (stopping) {
if (stopwatch) {
max_timeout = max_shutdown_timeout
- (int) zmq_stopwatch_intermediate (stopwatch);
// bail out eventually, when max_shutdown_timeout has reached,
// to avoid spinning forever in case of some error
zmq_assert (max_timeout > 0);
} else {
stopwatch = zmq_stopwatch_start ();
max_timeout = max_shutdown_timeout;
}
}
// Execute any due timers. // Execute any due timers.
int timeout = std::min ((int) execute_timers (), max_timeout); int timeout = (int) execute_timers ();
cleanup_retired ();
#ifdef _WIN32
if (family_entries.empty ()) {
#else
if (family_entry.fd_entries.empty ()) {
#endif
zmq_assert (get_load () == 0);
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
#if defined ZMQ_HAVE_OSX #if defined ZMQ_HAVE_OSX
struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000}; struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
...@@ -414,7 +406,6 @@ void zmq::select_t::loop () ...@@ -414,7 +406,6 @@ void zmq::select_t::loop ()
select_family_entry (family_entry, maxfd + 1, timeout > 0, tv); select_family_entry (family_entry, maxfd + 1, timeout > 0, tv);
#endif #endif
} }
zmq_stopwatch_stop (stopwatch);
} }
void zmq::select_t::select_family_entry (family_entry_t &family_entry_, void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
...@@ -442,18 +433,7 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_, ...@@ -442,18 +433,7 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
trigger_events (fd_entries, local_fds_set, rc); trigger_events (fd_entries, local_fds_set, rc);
if (family_entry_.has_retired) { cleanup_retired (family_entry_);
family_entry_.has_retired = false;
family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (),
fd_entries.end (),
is_retired_fd),
family_entry_.fd_entries.end ());
}
}
void zmq::select_t::worker_routine (void *arg_)
{
((select_t *) arg_)->loop ();
} }
zmq::select_t::fds_set_t::fds_set_t () zmq::select_t::fds_set_t::fds_set_t ()
...@@ -518,6 +498,33 @@ void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_) ...@@ -518,6 +498,33 @@ void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
FD_CLR (fd_, &error); FD_CLR (fd_, &error);
} }
bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
{
if (family_entry_.has_retired) {
family_entry_.has_retired = false;
family_entry_.fd_entries.erase (
std::remove_if (family_entry_.fd_entries.begin (),
family_entry_.fd_entries.end (), is_retired_fd),
family_entry_.fd_entries.end ());
}
return family_entry_.fd_entries.empty ();
}
void zmq::select_t::cleanup_retired ()
{
#ifdef _WIN32
for (family_entries_t::iterator it = family_entries.begin ();
it != family_entries.end ();) {
if (cleanup_retired (it->second))
it = family_entries.erase (it);
else
++it;
}
#else
cleanup_retired (family_entry);
#endif
}
bool zmq::select_t::is_retired_fd (const fd_entry_t &entry) bool zmq::select_t::is_retired_fd (const fd_entry_t &entry)
{ {
return (entry.fd == retired_fd); return (entry.fd == retired_fd);
......
...@@ -58,7 +58,7 @@ struct i_poll_events; ...@@ -58,7 +58,7 @@ struct i_poll_events;
// Implements socket polling mechanism using POSIX.1-2001 select() // Implements socket polling mechanism using POSIX.1-2001 select()
// function. // function.
class select_t : public poller_base_t class select_t : public worker_poller_base_t
{ {
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
...@@ -73,21 +73,14 @@ class select_t : public poller_base_t ...@@ -73,21 +73,14 @@ class select_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Internal state. // Internal state.
struct fds_set_t struct fds_set_t
{ {
...@@ -159,21 +152,15 @@ class select_t : public poller_base_t ...@@ -159,21 +152,15 @@ class select_t : public poller_base_t
fd_t maxfd; fd_t maxfd;
#endif #endif
void cleanup_retired ();
bool cleanup_retired (family_entry_t &family_entry_);
// Checks if an fd_entry_t is retired. // Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry); static bool is_retired_fd (const fd_entry_t &entry);
static fd_entries_t::iterator static fd_entries_t::iterator
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);
// If true, start has been called.
bool started;
// If true, thread is shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
select_t (const select_t &); select_t (const select_t &);
const select_t &operator= (const select_t &); const select_t &operator= (const select_t &);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment