Commit 90c6d993 authored by Simon Giesecke's avatar Simon Giesecke

Problem: kqueue_t fails unittest_poller

Solution: fix shutdown of kqueue_t
parent 3b90ad8c
...@@ -55,8 +55,7 @@ ...@@ -55,8 +55,7 @@
#endif #endif
zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) : zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_)
stopping (false)
{ {
// Create event queue // Create event queue
kqueue_fd = kqueue (); kqueue_fd = kqueue ();
...@@ -68,12 +67,13 @@ zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) : ...@@ -68,12 +67,13 @@ zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
zmq::kqueue_t::~kqueue_t () zmq::kqueue_t::~kqueue_t ()
{ {
worker.stop (); stop_worker ();
close (kqueue_fd); close (kqueue_fd);
} }
void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
{ {
check_thread ();
struct kevent ev; struct kevent ev;
EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t) udata_); EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t) udata_);
...@@ -93,6 +93,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) ...@@ -93,6 +93,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
i_poll_events *reactor_) i_poll_events *reactor_)
{ {
check_thread ();
poll_entry_t *pe = new (std::nothrow) poll_entry_t; poll_entry_t *pe = new (std::nothrow) poll_entry_t;
alloc_assert (pe); alloc_assert (pe);
...@@ -108,6 +109,7 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, ...@@ -108,6 +109,7 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::rm_fd (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (pe->flag_pollin) if (pe->flag_pollin)
kevent_delete (pe->fd, EVFILT_READ); kevent_delete (pe->fd, EVFILT_READ);
...@@ -121,6 +123,7 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) ...@@ -121,6 +123,7 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void zmq::kqueue_t::set_pollin (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (!pe->flag_pollin)) { if (likely (!pe->flag_pollin)) {
pe->flag_pollin = true; pe->flag_pollin = true;
...@@ -130,6 +133,7 @@ void zmq::kqueue_t::set_pollin (handle_t handle_) ...@@ -130,6 +133,7 @@ void zmq::kqueue_t::set_pollin (handle_t handle_)
void zmq::kqueue_t::reset_pollin (handle_t handle_) void zmq::kqueue_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (pe->flag_pollin)) { if (likely (pe->flag_pollin)) {
pe->flag_pollin = false; pe->flag_pollin = false;
...@@ -139,6 +143,7 @@ void zmq::kqueue_t::reset_pollin (handle_t handle_) ...@@ -139,6 +143,7 @@ void zmq::kqueue_t::reset_pollin (handle_t handle_)
void zmq::kqueue_t::set_pollout (handle_t handle_) void zmq::kqueue_t::set_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (!pe->flag_pollout)) { if (likely (!pe->flag_pollout)) {
pe->flag_pollout = true; pe->flag_pollout = true;
...@@ -148,6 +153,7 @@ void zmq::kqueue_t::set_pollout (handle_t handle_) ...@@ -148,6 +153,7 @@ void zmq::kqueue_t::set_pollout (handle_t handle_)
void zmq::kqueue_t::reset_pollout (handle_t handle_) void zmq::kqueue_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (pe->flag_pollout)) { if (likely (pe->flag_pollout)) {
pe->flag_pollout = false; pe->flag_pollout = false;
...@@ -155,14 +161,8 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) ...@@ -155,14 +161,8 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
} }
} }
void zmq::kqueue_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::kqueue_t::stop () void zmq::kqueue_t::stop ()
{ {
stopping = true;
} }
int zmq::kqueue_t::max_fds () int zmq::kqueue_t::max_fds ()
...@@ -172,10 +172,18 @@ int zmq::kqueue_t::max_fds () ...@@ -172,10 +172,18 @@ int zmq::kqueue_t::max_fds ()
void zmq::kqueue_t::loop () void zmq::kqueue_t::loop ()
{ {
while (!stopping) { while (true) {
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
if (get_load () == 0) {
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
// Wait for events. // Wait for events.
struct kevent ev_buf[max_io_events]; struct kevent ev_buf[max_io_events];
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
...@@ -219,9 +227,4 @@ void zmq::kqueue_t::loop () ...@@ -219,9 +227,4 @@ void zmq::kqueue_t::loop ()
} }
} }
void zmq::kqueue_t::worker_routine (void *arg_)
{
((kqueue_t *) arg_)->loop ();
}
#endif #endif
...@@ -49,7 +49,7 @@ struct i_poll_events; ...@@ -49,7 +49,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the BSD-specific // Implements socket polling mechanism using the BSD-specific
// kqueue interface. // kqueue interface.
class kqueue_t : public poller_base_t class kqueue_t : public worker_poller_base_t
{ {
public: public:
typedef void *handle_t; typedef void *handle_t;
...@@ -64,21 +64,14 @@ class kqueue_t : public poller_base_t ...@@ -64,21 +64,14 @@ class kqueue_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// File descriptor referring to the kernel event queue. // File descriptor referring to the kernel event queue.
fd_t kqueue_fd; fd_t kqueue_fd;
...@@ -100,12 +93,6 @@ class kqueue_t : public poller_base_t ...@@ -100,12 +93,6 @@ class kqueue_t : public poller_base_t
typedef std::vector<poll_entry_t *> retired_t; typedef std::vector<poll_entry_t *> retired_t;
retired_t retired; retired_t retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
kqueue_t (const kqueue_t &); kqueue_t (const kqueue_t &);
const kqueue_t &operator= (const kqueue_t &); const kqueue_t &operator= (const kqueue_t &);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment