Commit e6502405 authored by Simon Giesecke's avatar Simon Giesecke

Problem: data race w.r.t. poll_t::stopping

Solution: remove stopping, stop on thread-safe conditions; add
additional checks for correct thread-safe usage
parent 08201bc1
...@@ -44,20 +44,19 @@ ...@@ -44,20 +44,19 @@
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::poll_t::poll_t (const zmq::thread_ctx_t &ctx_) : zmq::poll_t::poll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_),
retired (false), retired (false)
stopping (false)
{ {
} }
zmq::poll_t::~poll_t () zmq::poll_t::~poll_t ()
{ {
stop (); stop_worker ();
worker.stop ();
} }
zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
check_thread ();
zmq_assert (fd_ != retired_fd); zmq_assert (fd_ != retired_fd);
// If the file descriptor table is too small expand it. // If the file descriptor table is too small expand it.
...@@ -85,6 +84,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -85,6 +84,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::poll_t::rm_fd (handle_t handle_) void zmq::poll_t::rm_fd (handle_t handle_)
{ {
check_thread ();
fd_t index = fd_table[handle_].index; fd_t index = fd_table[handle_].index;
zmq_assert (index != retired_fd); zmq_assert (index != retired_fd);
...@@ -99,36 +99,36 @@ void zmq::poll_t::rm_fd (handle_t handle_) ...@@ -99,36 +99,36 @@ void zmq::poll_t::rm_fd (handle_t handle_)
void zmq::poll_t::set_pollin (handle_t handle_) void zmq::poll_t::set_pollin (handle_t handle_)
{ {
check_thread ();
fd_t index = fd_table[handle_].index; fd_t index = fd_table[handle_].index;
pollset[index].events |= POLLIN; pollset[index].events |= POLLIN;
} }
void zmq::poll_t::reset_pollin (handle_t handle_) void zmq::poll_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
fd_t index = fd_table[handle_].index; fd_t index = fd_table[handle_].index;
pollset[index].events &= ~((short) POLLIN); pollset[index].events &= ~((short) POLLIN);
} }
void zmq::poll_t::set_pollout (handle_t handle_) void zmq::poll_t::set_pollout (handle_t handle_)
{ {
check_thread ();
fd_t index = fd_table[handle_].index; fd_t index = fd_table[handle_].index;
pollset[index].events |= POLLOUT; pollset[index].events |= POLLOUT;
} }
void zmq::poll_t::reset_pollout (handle_t handle_) void zmq::poll_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
fd_t index = fd_table[handle_].index; fd_t index = fd_table[handle_].index;
pollset[index].events &= ~((short) POLLOUT); pollset[index].events &= ~((short) POLLOUT);
} }
void zmq::poll_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::poll_t::stop () void zmq::poll_t::stop ()
{ {
stopping = true; check_thread ();
// no-op... thread is stopped when no more fds or timers are registered
} }
int zmq::poll_t::max_fds () int zmq::poll_t::max_fds ()
...@@ -138,14 +138,19 @@ int zmq::poll_t::max_fds () ...@@ -138,14 +138,19 @@ int zmq::poll_t::max_fds ()
void zmq::poll_t::loop () void zmq::poll_t::loop ()
{ {
while (!stopping) { while (true) {
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
cleanup_retired (); cleanup_retired ();
if (pollset.empty ()) { if (pollset.empty ()) {
// TODO yield? or sleep for timeout? zmq_assert (get_load () == 0);
if (timeout == 0)
break;
// TODO sleep for timeout
continue; continue;
} }
...@@ -200,9 +205,5 @@ void zmq::poll_t::cleanup_retired () ...@@ -200,9 +205,5 @@ void zmq::poll_t::cleanup_retired ()
} }
} }
void zmq::poll_t::worker_routine (void *arg_)
{
((poll_t *) arg_)->loop ();
}
#endif #endif
...@@ -52,7 +52,7 @@ struct i_poll_events; ...@@ -52,7 +52,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the POSIX.1-2001 // Implements socket polling mechanism using the POSIX.1-2001
// poll() system call. // poll() system call.
class poll_t : public poller_base_t class poll_t : public worker_poller_base_t
{ {
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
...@@ -61,28 +61,22 @@ class poll_t : public poller_base_t ...@@ -61,28 +61,22 @@ class poll_t : public poller_base_t
~poll_t (); ~poll_t ();
// "poller" concept. // "poller" concept.
// These methods may only be called from an event callback; add_fd may also be called before start.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); virtual void loop ();
void cleanup_retired();
// Reference to ZMQ context. void cleanup_retired ();
const thread_ctx_t &ctx;
struct fd_entry_t struct fd_entry_t
{ {
...@@ -101,12 +95,6 @@ class poll_t : public poller_base_t ...@@ -101,12 +95,6 @@ class poll_t : public poller_base_t
// If true, there's at least one retired event source. // If true, there's at least one retired event source.
bool retired; bool retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
poll_t (const poll_t &); poll_t (const poll_t &);
const poll_t &operator= (const poll_t &); const poll_t &operator= (const poll_t &);
}; };
......
...@@ -106,3 +106,31 @@ uint64_t zmq::poller_base_t::execute_timers () ...@@ -106,3 +106,31 @@ uint64_t zmq::poller_base_t::execute_timers ()
// There are no more timers. // There are no more timers.
return 0; return 0;
} }
zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) :
ctx (ctx_)
{
}
void zmq::worker_poller_base_t::stop_worker ()
{
worker.stop ();
}
void zmq::worker_poller_base_t::start ()
{
zmq_assert (get_load () > 0);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::worker_poller_base_t::check_thread ()
{
#ifdef _DEBUG
zmq_assert (!worker.get_started () || worker.is_current_thread ());
#endif
}
void zmq::worker_poller_base_t::worker_routine (void *arg_)
{
((worker_poller_base_t *) arg_)->loop ();
}
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "clock.hpp" #include "clock.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "ctx.hpp"
namespace zmq namespace zmq
{ {
...@@ -85,6 +86,32 @@ class poller_base_t ...@@ -85,6 +86,32 @@ class poller_base_t
poller_base_t (const poller_base_t &); poller_base_t (const poller_base_t &);
const poller_base_t &operator= (const poller_base_t &); const poller_base_t &operator= (const poller_base_t &);
}; };
// base class for a poller with a single worker thread.
class worker_poller_base_t : public poller_base_t
{
public:
worker_poller_base_t (const thread_ctx_t &ctx_);
void stop_worker ();
// Starts the poller.
void start ();
protected:
void check_thread ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
virtual void loop () = 0;
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Handle of the physical thread doing the I/O work.
thread_t worker;
};
} }
#endif #endif
...@@ -32,6 +32,11 @@ ...@@ -32,6 +32,11 @@
#include "thread.hpp" #include "thread.hpp"
#include "err.hpp" #include "err.hpp"
bool zmq::thread_t::get_started () const
{
return started;
}
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
extern "C" { extern "C" {
...@@ -62,6 +67,11 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_) ...@@ -62,6 +67,11 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
started = true; started = true;
} }
bool zmq::thread_t::is_current_thread () const
{
return GetCurrentThreadId () == GetThreadId (descriptor);
}
void zmq::thread_t::stop () void zmq::thread_t::stop ()
{ {
if (started) { if (started) {
...@@ -130,6 +140,11 @@ void zmq::thread_t::stop () ...@@ -130,6 +140,11 @@ void zmq::thread_t::stop ()
} }
} }
bool zmq::thread_t::is_current_thread () const
{
return pthread_self () == descriptor;
}
void zmq::thread_t::setSchedulingParameters ( void zmq::thread_t::setSchedulingParameters (
int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_) int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
{ {
......
...@@ -62,6 +62,13 @@ class thread_t ...@@ -62,6 +62,13 @@ class thread_t
// 'arg' as an argument. // 'arg' as an argument.
void start (thread_fn *tfn_, void *arg_); void start (thread_fn *tfn_, void *arg_);
// Returns whether the thread was started, i.e. start was called.
bool get_started () const;
// Returns whether the executing thread is the thread represented by the
// thread object.
bool is_current_thread () const;
// Waits for thread termination. // Waits for thread termination.
void stop (); void stop ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment