Unverified Commit e57afec8 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2943 from sigiesec/fix-select-unix

Problem: data races in select/poll poller implementations
parents a30133d8 c62574ff
......@@ -44,20 +44,19 @@
#include "i_poll_events.hpp"
zmq::poll_t::poll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_),
retired (false),
stopping (false)
worker_poller_base_t (ctx_),
retired (false)
{
}
zmq::poll_t::~poll_t ()
{
stop ();
worker.stop ();
stop_worker ();
}
zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
check_thread ();
zmq_assert (fd_ != retired_fd);
// If the file descriptor table is too small expand it.
......@@ -85,6 +84,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::poll_t::rm_fd (handle_t handle_)
{
check_thread ();
fd_t index = fd_table[handle_].index;
zmq_assert (index != retired_fd);
......@@ -99,36 +99,36 @@ void zmq::poll_t::rm_fd (handle_t handle_)
void zmq::poll_t::set_pollin (handle_t handle_)
{
check_thread ();
fd_t index = fd_table[handle_].index;
pollset[index].events |= POLLIN;
}
void zmq::poll_t::reset_pollin (handle_t handle_)
{
check_thread ();
fd_t index = fd_table[handle_].index;
pollset[index].events &= ~((short) POLLIN);
}
void zmq::poll_t::set_pollout (handle_t handle_)
{
check_thread ();
fd_t index = fd_table[handle_].index;
pollset[index].events |= POLLOUT;
}
void zmq::poll_t::reset_pollout (handle_t handle_)
{
check_thread ();
fd_t index = fd_table[handle_].index;
pollset[index].events &= ~((short) POLLOUT);
}
void zmq::poll_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::poll_t::stop ()
{
stopping = true;
check_thread ();
// no-op... thread is stopped when no more fds or timers are registered
}
int zmq::poll_t::max_fds ()
......@@ -138,14 +138,19 @@ int zmq::poll_t::max_fds ()
void zmq::poll_t::loop ()
{
while (!stopping) {
while (true) {
// Execute any due timers.
int timeout = (int) execute_timers ();
cleanup_retired ();
if (pollset.empty ()) {
// TODO yield? or sleep for timeout?
zmq_assert (get_load () == 0);
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
......@@ -200,9 +205,5 @@ void zmq::poll_t::cleanup_retired ()
}
}
void zmq::poll_t::worker_routine (void *arg_)
{
((poll_t *) arg_)->loop ();
}
#endif
......@@ -52,7 +52,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
class poll_t : public poller_base_t
class poll_t : public worker_poller_base_t
{
public:
typedef fd_t handle_t;
......@@ -61,28 +61,22 @@ class poll_t : public poller_base_t
~poll_t ();
// "poller" concept.
// These methods may only be called from an event callback; add_fd may also be called before start.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
static int max_fds ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
void cleanup_retired();
virtual void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
void cleanup_retired ();
struct fd_entry_t
{
......@@ -101,12 +95,6 @@ class poll_t : public poller_base_t
// If true, there's at least one retired event source.
bool retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
poll_t (const poll_t &);
const poll_t &operator= (const poll_t &);
};
......
......@@ -106,3 +106,31 @@ uint64_t zmq::poller_base_t::execute_timers ()
// There are no more timers.
return 0;
}
zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) :
ctx (ctx_)
{
}
void zmq::worker_poller_base_t::stop_worker ()
{
worker.stop ();
}
void zmq::worker_poller_base_t::start ()
{
zmq_assert (get_load () > 0);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::worker_poller_base_t::check_thread ()
{
#ifdef _DEBUG
zmq_assert (!worker.get_started () || worker.is_current_thread ());
#endif
}
void zmq::worker_poller_base_t::worker_routine (void *arg_)
{
((worker_poller_base_t *) arg_)->loop ();
}
......@@ -34,6 +34,7 @@
#include "clock.hpp"
#include "atomic_counter.hpp"
#include "ctx.hpp"
namespace zmq
{
......@@ -85,6 +86,32 @@ class poller_base_t
poller_base_t (const poller_base_t &);
const poller_base_t &operator= (const poller_base_t &);
};
// base class for a poller with a single worker thread.
class worker_poller_base_t : public poller_base_t
{
public:
worker_poller_base_t (const thread_ctx_t &ctx_);
void stop_worker ();
// Starts the poller.
void start ();
protected:
void check_thread ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
virtual void loop () = 0;
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Handle of the physical thread doing the I/O work.
thread_t worker;
};
}
#endif
......@@ -52,15 +52,13 @@
#include <climits>
zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_),
worker_poller_base_t (ctx_),
#if defined ZMQ_HAVE_WINDOWS
// Fine as long as map is not cleared.
current_family_entry_it (family_entries.end ()),
current_family_entry_it (family_entries.end ())
#else
maxfd (retired_fd),
maxfd (retired_fd)
#endif
started (false),
stopping (false)
{
#if defined ZMQ_HAVE_WINDOWS
for (size_t i = 0; i < fd_family_cache_size; ++i)
......@@ -70,15 +68,12 @@ zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
zmq::select_t::~select_t ()
{
if (started) {
stop ();
worker.stop ();
}
zmq_assert (get_load () == 0);
stop_worker ();
}
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
check_thread ();
zmq_assert (fd_ != retired_fd);
fd_entry_t fd_entry;
......@@ -123,13 +118,14 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
// Size is cached to avoid iteration through recently added descriptors.
for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
i < size && event_count_ > 0; ++i) {
const fd_entry_t &current_fd_entry = fd_entries_[i];
// fd_entries_[i] may not be stored, since calls to
// in_event/out_event may reallocate the vector
if (is_retired_fd (current_fd_entry))
if (is_retired_fd (fd_entries_[i]))
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.read)) {
current_fd_entry.events->in_event ();
if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
fd_entries_[i].events->in_event ();
--event_count_;
}
......@@ -137,20 +133,20 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
// was retired before, we would already have continued, and I
// don't see where it might have been modified
// And if rc == 0, we can break instead of continuing
if (is_retired_fd (current_fd_entry) || event_count_ == 0)
if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.write)) {
current_fd_entry.events->out_event ();
if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
fd_entries_[i].events->out_event ();
--event_count_;
}
// TODO: same as above
if (is_retired_fd (current_fd_entry) || event_count_ == 0)
if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.error)) {
current_fd_entry.events->in_event ();
if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
fd_entries_[i].events->in_event ();
--event_count_;
}
}
......@@ -189,9 +185,10 @@ int zmq::select_t::try_retire_fd_entry (
void zmq::select_t::rm_fd (handle_t handle_)
{
check_thread ();
int retired = 0;
#if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_);
int retired = 0;
if (family != AF_UNSPEC) {
family_entries_t::iterator family_entry_it =
family_entries.find (family);
......@@ -237,6 +234,7 @@ void zmq::select_t::rm_fd (handle_t handle_)
void zmq::select_t::set_pollin (handle_t handle_)
{
check_thread ();
#if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC);
......@@ -247,6 +245,7 @@ void zmq::select_t::set_pollin (handle_t handle_)
void zmq::select_t::reset_pollin (handle_t handle_)
{
check_thread ();
#if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC);
......@@ -257,6 +256,7 @@ void zmq::select_t::reset_pollin (handle_t handle_)
void zmq::select_t::set_pollout (handle_t handle_)
{
check_thread ();
#if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC);
......@@ -267,6 +267,7 @@ void zmq::select_t::set_pollout (handle_t handle_)
void zmq::select_t::reset_pollout (handle_t handle_)
{
check_thread ();
#if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC);
......@@ -275,15 +276,10 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR (handle_, &family_entry.fds_set.write);
}
void zmq::select_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
started = true;
}
void zmq::select_t::stop ()
{
stopping = true;
check_thread ();
// no-op... thread is stopped when no more fds or timers are registered
}
int zmq::select_t::max_fds ()
......@@ -291,30 +287,27 @@ int zmq::select_t::max_fds ()
return FD_SETSIZE;
}
// TODO should this be configurable?
const int max_shutdown_timeout = 250;
void zmq::select_t::loop ()
{
void *stopwatch = NULL;
while (!stopwatch || get_load ()) {
int max_timeout = INT_MAX;
if (stopping) {
if (stopwatch) {
max_timeout = max_shutdown_timeout
- (int) zmq_stopwatch_intermediate (stopwatch);
// bail out eventually, when max_shutdown_timeout has reached,
// to avoid spinning forever in case of some error
zmq_assert (max_timeout > 0);
} else {
stopwatch = zmq_stopwatch_start ();
max_timeout = max_shutdown_timeout;
}
}
while (true) {
// Execute any due timers.
int timeout = std::min ((int) execute_timers (), max_timeout);
int timeout = (int) execute_timers ();
cleanup_retired ();
#ifdef _WIN32
if (family_entries.empty ()) {
#else
if (family_entry.fd_entries.empty ()) {
#endif
zmq_assert (get_load () == 0);
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
#if defined ZMQ_HAVE_OSX
struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
......@@ -411,10 +404,9 @@ void zmq::select_t::loop ()
}
}
#else
select_family_entry (family_entry, maxfd, timeout > 0, tv);
select_family_entry (family_entry, maxfd + 1, timeout > 0, tv);
#endif
}
zmq_stopwatch_stop (stopwatch);
}
void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
......@@ -442,18 +434,7 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
trigger_events (fd_entries, local_fds_set, rc);
if (family_entry_.has_retired) {
family_entry_.has_retired = false;
family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (),
fd_entries.end (),
is_retired_fd),
family_entry_.fd_entries.end ());
}
}
void zmq::select_t::worker_routine (void *arg_)
{
((select_t *) arg_)->loop ();
cleanup_retired (family_entry_);
}
zmq::select_t::fds_set_t::fds_set_t ()
......@@ -518,11 +499,43 @@ void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
FD_CLR (fd_, &error);
}
bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
{
if (family_entry_.has_retired) {
family_entry_.has_retired = false;
family_entry_.fd_entries.erase (
std::remove_if (family_entry_.fd_entries.begin (),
family_entry_.fd_entries.end (), is_retired_fd),
family_entry_.fd_entries.end ());
}
return family_entry_.fd_entries.empty ();
}
void zmq::select_t::cleanup_retired ()
{
#ifdef _WIN32
for (family_entries_t::iterator it = family_entries.begin ();
it != family_entries.end ();) {
if (cleanup_retired (it->second))
it = family_entries.erase (it);
else
++it;
}
#else
cleanup_retired (family_entry);
#endif
}
bool zmq::select_t::is_retired_fd (const fd_entry_t &entry)
{
return (entry.fd == retired_fd);
}
zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
{
}
#if defined ZMQ_HAVE_WINDOWS
u_short zmq::select_t::get_fd_family (fd_t fd_)
{
......@@ -579,11 +592,6 @@ u_short zmq::select_t::determine_fd_family (fd_t fd_)
return AF_UNSPEC;
}
zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
{
}
zmq::select_t::wsa_events_t::wsa_events_t ()
{
events[0] = WSACreateEvent ();
......
......@@ -58,7 +58,7 @@ struct i_poll_events;
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
class select_t : public poller_base_t
class select_t : public worker_poller_base_t
{
public:
typedef fd_t handle_t;
......@@ -73,21 +73,14 @@ class select_t : public poller_base_t
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
static int max_fds ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Internal state.
struct fds_set_t
{
......@@ -115,11 +108,7 @@ class select_t : public poller_base_t
struct family_entry_t
{
#ifndef ZMQ_HAVE_WINDOWS
family_entry_t () {};
#else
family_entry_t ();
#endif
fd_entries_t fd_entries;
fds_set_t fds_set;
......@@ -161,24 +150,17 @@ class select_t : public poller_base_t
// on non-Windows, we can treat all fds as one family
family_entry_t family_entry;
fd_t maxfd;
bool retired;
#endif
void cleanup_retired ();
bool cleanup_retired (family_entry_t &family_entry_);
// Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry);
static fd_entries_t::iterator
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);
// If true, start has been called.
bool started;
// If true, thread is shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
select_t (const select_t &);
const select_t &operator= (const select_t &);
};
......
......@@ -32,6 +32,11 @@
#include "thread.hpp"
#include "err.hpp"
bool zmq::thread_t::get_started () const
{
return started;
}
#ifdef ZMQ_HAVE_WINDOWS
extern "C" {
......@@ -62,6 +67,11 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
started = true;
}
bool zmq::thread_t::is_current_thread () const
{
return GetCurrentThreadId () == GetThreadId (descriptor);
}
void zmq::thread_t::stop ()
{
if (started) {
......@@ -130,6 +140,11 @@ void zmq::thread_t::stop ()
}
}
bool zmq::thread_t::is_current_thread () const
{
return pthread_self () == descriptor;
}
void zmq::thread_t::setSchedulingParameters (
int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
{
......
......@@ -62,6 +62,13 @@ class thread_t
// 'arg' as an argument.
void start (thread_fn *tfn_, void *arg_);
// Returns whether the thread was started, i.e. start was called.
bool get_started () const;
// Returns whether the executing thread is the thread represented by the
// thread object.
bool is_current_thread () const;
// Waits for thread termination.
void stop ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment