Commit 212ab4f8 authored by Jacques Germishuys's avatar Jacques Germishuys

Problem: /dev/poll doesn't compile

Solution: Make devpoll_t derive from worker_poller_base_t
parent bd6fa4bb
...@@ -47,8 +47,7 @@ ...@@ -47,8 +47,7 @@
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t (const zmq::thread_ctx_t &ctx_) : zmq::devpoll_t::devpoll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_)
stopping (false)
{ {
devpoll_fd = open ("/dev/poll", O_RDWR); devpoll_fd = open ("/dev/poll", O_RDWR);
errno_assert (devpoll_fd != -1); errno_assert (devpoll_fd != -1);
...@@ -56,7 +55,9 @@ zmq::devpoll_t::devpoll_t (const zmq::thread_ctx_t &ctx_) : ...@@ -56,7 +55,9 @@ zmq::devpoll_t::devpoll_t (const zmq::thread_ctx_t &ctx_) :
zmq::devpoll_t::~devpoll_t () zmq::devpoll_t::~devpoll_t ()
{ {
worker.stop (); // Wait till the worker thread exits.
stop_worker ();
close (devpoll_fd); close (devpoll_fd);
} }
...@@ -70,6 +71,7 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) ...@@ -70,6 +71,7 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
i_poll_events *reactor_) i_poll_events *reactor_)
{ {
check_thread ();
// If the file descriptor table is too small expand it. // If the file descriptor table is too small expand it.
fd_table_t::size_type sz = fd_table.size (); fd_table_t::size_type sz = fd_table.size ();
if (sz <= (fd_table_t::size_type) fd_) { if (sz <= (fd_table_t::size_type) fd_) {
...@@ -98,6 +100,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, ...@@ -98,6 +100,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::rm_fd (handle_t handle_)
{ {
check_thread ();
zmq_assert (fd_table[handle_].valid); zmq_assert (fd_table[handle_].valid);
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
...@@ -109,6 +112,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) ...@@ -109,6 +112,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
void zmq::devpoll_t::set_pollin (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_)
{ {
check_thread ();
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table[handle_].events |= POLLIN; fd_table[handle_].events |= POLLIN;
devpoll_ctl (handle_, fd_table[handle_].events); devpoll_ctl (handle_, fd_table[handle_].events);
...@@ -116,6 +120,7 @@ void zmq::devpoll_t::set_pollin (handle_t handle_) ...@@ -116,6 +120,7 @@ void zmq::devpoll_t::set_pollin (handle_t handle_)
void zmq::devpoll_t::reset_pollin (handle_t handle_) void zmq::devpoll_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table[handle_].events &= ~((short) POLLIN); fd_table[handle_].events &= ~((short) POLLIN);
devpoll_ctl (handle_, fd_table[handle_].events); devpoll_ctl (handle_, fd_table[handle_].events);
...@@ -123,6 +128,7 @@ void zmq::devpoll_t::reset_pollin (handle_t handle_) ...@@ -123,6 +128,7 @@ void zmq::devpoll_t::reset_pollin (handle_t handle_)
void zmq::devpoll_t::set_pollout (handle_t handle_) void zmq::devpoll_t::set_pollout (handle_t handle_)
{ {
check_thread ();
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table[handle_].events |= POLLOUT; fd_table[handle_].events |= POLLOUT;
devpoll_ctl (handle_, fd_table[handle_].events); devpoll_ctl (handle_, fd_table[handle_].events);
...@@ -130,19 +136,15 @@ void zmq::devpoll_t::set_pollout (handle_t handle_) ...@@ -130,19 +136,15 @@ void zmq::devpoll_t::set_pollout (handle_t handle_)
void zmq::devpoll_t::reset_pollout (handle_t handle_) void zmq::devpoll_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table[handle_].events &= ~((short) POLLOUT); fd_table[handle_].events &= ~((short) POLLOUT);
devpoll_ctl (handle_, fd_table[handle_].events); devpoll_ctl (handle_, fd_table[handle_].events);
} }
void zmq::devpoll_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::devpoll_t::stop () void zmq::devpoll_t::stop ()
{ {
stopping = true; check_thread ();
} }
int zmq::devpoll_t::max_fds () int zmq::devpoll_t::max_fds ()
...@@ -152,7 +154,7 @@ int zmq::devpoll_t::max_fds () ...@@ -152,7 +154,7 @@ int zmq::devpoll_t::max_fds ()
void zmq::devpoll_t::loop () void zmq::devpoll_t::loop ()
{ {
while (!stopping) { while (true) {
struct pollfd ev_buf[max_io_events]; struct pollfd ev_buf[max_io_events];
struct dvpoll poll_req; struct dvpoll poll_req;
...@@ -163,6 +165,14 @@ void zmq::devpoll_t::loop () ...@@ -163,6 +165,14 @@ void zmq::devpoll_t::loop ()
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
if (get_load () == 0) {
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
// Wait for events. // Wait for events.
// On Solaris, we can retrieve no more then (OPEN_MAX - 1) events. // On Solaris, we can retrieve no more then (OPEN_MAX - 1) events.
poll_req.dp_fds = &ev_buf[0]; poll_req.dp_fds = &ev_buf[0];
...@@ -195,9 +205,4 @@ void zmq::devpoll_t::loop () ...@@ -195,9 +205,4 @@ void zmq::devpoll_t::loop ()
} }
} }
void zmq::devpoll_t::worker_routine (void *arg_)
{
((devpoll_t *) arg_)->loop ();
}
#endif #endif
...@@ -47,7 +47,7 @@ struct i_poll_events; ...@@ -47,7 +47,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the "/dev/poll" interface. // Implements socket polling mechanism using the "/dev/poll" interface.
class devpoll_t : public poller_base_t class devpoll_t : public worker_poller_base_t
{ {
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
...@@ -62,21 +62,14 @@ class devpoll_t : public poller_base_t ...@@ -62,21 +62,14 @@ class devpoll_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// File descriptor referring to "/dev/poll" pseudo-device. // File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd; fd_t devpoll_fd;
...@@ -97,12 +90,6 @@ class devpoll_t : public poller_base_t ...@@ -97,12 +90,6 @@ class devpoll_t : public poller_base_t
// Pollset manipulation function. // Pollset manipulation function.
void devpoll_ctl (fd_t fd_, short events_); void devpoll_ctl (fd_t fd_, short events_);
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
devpoll_t (const devpoll_t &); devpoll_t (const devpoll_t &);
const devpoll_t &operator= (const devpoll_t &); const devpoll_t &operator= (const devpoll_t &);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment