Commit 8d7bf668 authored by Martin Sustrik's avatar Martin Sustrik

common base for all pollers created; the only thing it handles at the moment is 'load'

parent cf815e8c
...@@ -89,6 +89,7 @@ libzmq_la_SOURCES = \ ...@@ -89,6 +89,7 @@ libzmq_la_SOURCES = \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
poller.hpp \ poller.hpp \
poller_base.hpp \
pair.hpp \ pair.hpp \
pub.hpp \ pub.hpp \
pull.hpp \ pull.hpp \
...@@ -148,6 +149,7 @@ libzmq_la_SOURCES = \ ...@@ -148,6 +149,7 @@ libzmq_la_SOURCES = \
pgm_socket.cpp \ pgm_socket.cpp \
pipe.cpp \ pipe.cpp \
poll.cpp \ poll.cpp \
poller_base.cpp \
pull.cpp \ pull.cpp \
push.cpp \ push.cpp \
pub.cpp \ pub.cpp \
......
...@@ -56,10 +56,6 @@ zmq::devpoll_t::devpoll_t () : ...@@ -56,10 +56,6 @@ zmq::devpoll_t::devpoll_t () :
zmq::devpoll_t::~devpoll_t () zmq::devpoll_t::~devpoll_t ()
{ {
worker.stop (); worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
close (devpoll_fd); close (devpoll_fd);
} }
...@@ -84,7 +80,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, ...@@ -84,7 +80,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
pending_list.push_back (fd_); pending_list.push_back (fd_);
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); adjust_load (1);
return fd_; return fd_;
} }
...@@ -97,7 +93,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) ...@@ -97,7 +93,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
fd_table [handle_].valid = false; fd_table [handle_].valid = false;
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
load.sub (1); adjust_load (-1);
} }
void zmq::devpoll_t::set_pollin (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_)
...@@ -140,11 +136,6 @@ void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_) ...@@ -140,11 +136,6 @@ void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it); timers.erase (it);
} }
int zmq::devpoll_t::get_load ()
{
return load.get ();
}
void zmq::devpoll_t::start () void zmq::devpoll_t::start ()
{ {
worker.start (worker_routine, this); worker.start (worker_routine, this);
......
...@@ -28,14 +28,14 @@ ...@@ -28,14 +28,14 @@
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "poller_base.hpp"
namespace zmq namespace zmq
{ {
// Implements socket polling mechanism using the "/dev/poll" interface. // Implements socket polling mechanism using the "/dev/poll" interface.
class devpoll_t class devpoll_t : public poller_base_t
{ {
public: public:
...@@ -53,7 +53,6 @@ namespace zmq ...@@ -53,7 +53,6 @@ namespace zmq
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start (); void start ();
void stop (); void stop ();
...@@ -94,10 +93,6 @@ namespace zmq ...@@ -94,10 +93,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
devpoll_t (const devpoll_t&); devpoll_t (const devpoll_t&);
void operator = (const devpoll_t&); void operator = (const devpoll_t&);
}; };
......
...@@ -45,9 +45,6 @@ zmq::epoll_t::~epoll_t () ...@@ -45,9 +45,6 @@ zmq::epoll_t::~epoll_t ()
// Wait till the worker thread exits. // Wait till the worker thread exits.
worker.stop (); worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
close (epoll_fd); close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++) for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++)
delete *it; delete *it;
...@@ -71,7 +68,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -71,7 +68,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
errno_assert (rc != -1); errno_assert (rc != -1);
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); adjust_load (1);
return pe; return pe;
} }
...@@ -85,7 +82,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) ...@@ -85,7 +82,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
retired.push_back (pe); retired.push_back (pe);
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
load.sub (1); adjust_load (-1);
} }
void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_)
...@@ -133,11 +130,6 @@ void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_) ...@@ -133,11 +130,6 @@ void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it); timers.erase (it);
} }
int zmq::epoll_t::get_load ()
{
return load.get ();
}
void zmq::epoll_t::start () void zmq::epoll_t::start ()
{ {
worker.start (worker_routine, this); worker.start (worker_routine, this);
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "poller_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -37,7 +37,7 @@ namespace zmq ...@@ -37,7 +37,7 @@ namespace zmq
// This class implements socket polling mechanism using the Linux-specific // This class implements socket polling mechanism using the Linux-specific
// epoll mechanism. // epoll mechanism.
class epoll_t class epoll_t : public poller_base_t
{ {
public: public:
...@@ -55,7 +55,6 @@ namespace zmq ...@@ -55,7 +55,6 @@ namespace zmq
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start (); void start ();
void stop (); void stop ();
...@@ -91,10 +90,6 @@ namespace zmq ...@@ -91,10 +90,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
epoll_t (const epoll_t&); epoll_t (const epoll_t&);
void operator = (const epoll_t&); void operator = (const epoll_t&);
}; };
......
...@@ -54,10 +54,6 @@ zmq::kqueue_t::kqueue_t () : ...@@ -54,10 +54,6 @@ zmq::kqueue_t::kqueue_t () :
zmq::kqueue_t::~kqueue_t () zmq::kqueue_t::~kqueue_t ()
{ {
worker.stop (); worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
close (kqueue_fd); close (kqueue_fd);
} }
...@@ -74,7 +70,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) ...@@ -74,7 +70,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
{ {
struct kevent ev; struct kevent ev;
EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t)NULL); EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t) NULL);
int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
errno_assert (rc != -1); errno_assert (rc != -1);
} }
...@@ -90,6 +86,8 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, ...@@ -90,6 +86,8 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
pe->flag_pollout = 0; pe->flag_pollout = 0;
pe->reactor = reactor_; pe->reactor = reactor_;
adjust_load (1);
return pe; return pe;
} }
...@@ -102,6 +100,8 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) ...@@ -102,6 +100,8 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
kevent_delete (pe->fd, EVFILT_WRITE); kevent_delete (pe->fd, EVFILT_WRITE);
pe->fd = retired_fd; pe->fd = retired_fd;
retired.push_back (pe); retired.push_back (pe);
adjust_load (-1);
} }
void zmq::kqueue_t::set_pollin (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_)
...@@ -144,11 +144,6 @@ void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_) ...@@ -144,11 +144,6 @@ void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it); timers.erase (it);
} }
int zmq::kqueue_t::get_load ()
{
return load.get ();
}
void zmq::kqueue_t::start () void zmq::kqueue_t::start ()
{ {
worker.start (worker_routine, this); worker.start (worker_routine, this);
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "poller_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -37,7 +37,7 @@ namespace zmq ...@@ -37,7 +37,7 @@ namespace zmq
// Implements socket polling mechanism using the BSD-specific // Implements socket polling mechanism using the BSD-specific
// kqueue interface. // kqueue interface.
class kqueue_t class kqueue_t : public poller_base_t
{ {
public: public:
...@@ -55,7 +55,6 @@ namespace zmq ...@@ -55,7 +55,6 @@ namespace zmq
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start (); void start ();
void stop (); void stop ();
...@@ -98,10 +97,6 @@ namespace zmq ...@@ -98,10 +97,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
kqueue_t (const kqueue_t&); kqueue_t (const kqueue_t&);
void operator = (const kqueue_t&); void operator = (const kqueue_t&);
}; };
......
...@@ -54,9 +54,6 @@ zmq::poll_t::poll_t () : ...@@ -54,9 +54,6 @@ zmq::poll_t::poll_t () :
zmq::poll_t::~poll_t () zmq::poll_t::~poll_t ()
{ {
worker.stop (); worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
} }
zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
...@@ -69,7 +66,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -69,7 +66,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
fd_table [fd_].events = events_; fd_table [fd_].events = events_;
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); adjust_load (1);
return fd_; return fd_;
} }
...@@ -85,7 +82,7 @@ void zmq::poll_t::rm_fd (handle_t handle_) ...@@ -85,7 +82,7 @@ void zmq::poll_t::rm_fd (handle_t handle_)
retired = true; retired = true;
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
load.sub (1); adjust_load (-1);
} }
void zmq::poll_t::set_pollin (handle_t handle_) void zmq::poll_t::set_pollin (handle_t handle_)
...@@ -124,11 +121,6 @@ void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_) ...@@ -124,11 +121,6 @@ void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it); timers.erase (it);
} }
int zmq::poll_t::get_load ()
{
return load.get ();
}
void zmq::poll_t::start () void zmq::poll_t::start ()
{ {
worker.start (worker_routine, this); worker.start (worker_routine, this);
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "poller_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
// Implements socket polling mechanism using the POSIX.1-2001 // Implements socket polling mechanism using the POSIX.1-2001
// poll() system call. // poll() system call.
class poll_t class poll_t : public poller_base_t
{ {
public: public:
...@@ -60,7 +60,6 @@ namespace zmq ...@@ -60,7 +60,6 @@ namespace zmq
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start (); void start ();
void stop (); void stop ();
...@@ -98,10 +97,6 @@ namespace zmq ...@@ -98,10 +97,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
poll_t (const poll_t&); poll_t (const poll_t&);
void operator = (const poll_t&); void operator = (const poll_t&);
}; };
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "poller_base.hpp"
#include "err.hpp"
zmq::poller_base_t::poller_base_t ()
{
}
zmq::poller_base_t::~poller_base_t ()
{
// Make sure there are no fds registered on shutdown.
zmq_assert (get_load () == 0);
}
int zmq::poller_base_t::get_load ()
{
return load.get ();
}
void zmq::poller_base_t::adjust_load (int amount_)
{
if (amount_ > 0)
load.add (amount_);
else if (amount_ < 0)
load.sub (-amount_);
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__
#define __ZMQ_POLLER_BASE_HPP_INCLUDED__
#include "atomic_counter.hpp"
namespace zmq
{
class poller_base_t
{
public:
poller_base_t ();
~poller_base_t ();
// Returns load of the poller. Note that this function can be
// invoked from a different thread!
int get_load ();
protected:
// Called by individual poller implementations to manage the load.
void adjust_load (int amount_);
private:
// Load of the poller. Currently the number of file descriptors
// registered.
atomic_counter_t load;
poller_base_t (const poller_base_t&);
void operator = (const poller_base_t&);
};
}
#endif
...@@ -54,9 +54,6 @@ zmq::select_t::select_t () : ...@@ -54,9 +54,6 @@ zmq::select_t::select_t () :
zmq::select_t::~select_t () zmq::select_t::~select_t ()
{ {
worker.stop (); worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
} }
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
...@@ -77,7 +74,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -77,7 +74,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
maxfd = fd_; maxfd = fd_;
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); adjust_load (1);
return fd_; return fd_;
} }
...@@ -113,7 +110,7 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -113,7 +110,7 @@ void zmq::select_t::rm_fd (handle_t handle_)
} }
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
load.sub (1); adjust_load (-1);
} }
void zmq::select_t::set_pollin (handle_t handle_) void zmq::select_t::set_pollin (handle_t handle_)
...@@ -148,11 +145,6 @@ void zmq::select_t::cancel_timer (i_poll_events *events_, int id_) ...@@ -148,11 +145,6 @@ void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it); timers.erase (it);
} }
int zmq::select_t::get_load ()
{
return load.get ();
}
void zmq::select_t::start () void zmq::select_t::start ()
{ {
worker.start (worker_routine, this); worker.start (worker_routine, this);
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "poller_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
// Implements socket polling mechanism using POSIX.1-2001 select() // Implements socket polling mechanism using POSIX.1-2001 select()
// function. // function.
class select_t class select_t : public poller_base_t
{ {
public: public:
...@@ -62,7 +62,6 @@ namespace zmq ...@@ -62,7 +62,6 @@ namespace zmq
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load ();
void start (); void start ();
void stop (); void stop ();
...@@ -109,10 +108,6 @@ namespace zmq ...@@ -109,10 +108,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
select_t (const select_t&); select_t (const select_t&);
void operator = (const select_t&); void operator = (const select_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment