Commit 40398587 authored by Doron Somech's avatar Doron Somech Committed by GitHub

Merge pull request #2112 from hnwyllmm/pollset

Pollset
parents 50e277bd 3a597117
...@@ -77,7 +77,7 @@ if (WITH_MILITANT) ...@@ -77,7 +77,7 @@ if (WITH_MILITANT)
endif() endif()
set (POLLER "" CACHE STRING "Choose polling system. valid values are set (POLLER "" CACHE STRING "Choose polling system. valid values are
kqueue, epoll, devpoll, poll or select [default=autodetect]") kqueue, pollset, epoll, devpoll, poll or select [default=autodetect]")
include (CheckFunctionExists) include (CheckFunctionExists)
include (CheckTypeSize) include (CheckTypeSize)
...@@ -91,6 +91,15 @@ if (POLLER STREQUAL "") ...@@ -91,6 +91,15 @@ if (POLLER STREQUAL "")
endif() endif()
endif () endif ()
if (POLLER STREQUAL "")
set (CMAKE_REQUIRED_INCLUDES sys/pollset.h)
check_function_exists (pollset_create HAVE_POLLSET)
set (CMAKE_REQUIRED_INCLUDES)
if (HAVE_POLLSET)
set (POLLER "pollset")
endif()
endif ()
if (POLLER STREQUAL "") if (POLLER STREQUAL "")
set (CMAKE_REQUIRED_INCLUDES sys/epoll.h) set (CMAKE_REQUIRED_INCLUDES sys/epoll.h)
check_function_exists (epoll_create HAVE_EPOLL) check_function_exists (epoll_create HAVE_EPOLL)
...@@ -136,6 +145,7 @@ if (POLLER STREQUAL "") ...@@ -136,6 +145,7 @@ if (POLLER STREQUAL "")
endif () endif ()
if (POLLER STREQUAL "kqueue" if (POLLER STREQUAL "kqueue"
OR POLLER STREQUAL "pollset"
OR POLLER STREQUAL "epoll" OR POLLER STREQUAL "epoll"
OR POLLER STREQUAL "devpoll" OR POLLER STREQUAL "devpoll"
OR POLLER STREQUAL "poll" OR POLLER STREQUAL "poll"
...@@ -468,6 +478,7 @@ set (cxx-sources ...@@ -468,6 +478,7 @@ set (cxx-sources
plain_server.cpp plain_server.cpp
poll.cpp poll.cpp
poller_base.cpp poller_base.cpp
pollset.cpp
precompiled.cpp precompiled.cpp
proxy.cpp proxy.cpp
pub.cpp pub.cpp
......
...@@ -133,6 +133,8 @@ src_libzmq_la_SOURCES = \ ...@@ -133,6 +133,8 @@ src_libzmq_la_SOURCES = \
src/poller.hpp \ src/poller.hpp \
src/poller_base.cpp \ src/poller_base.cpp \
src/poller_base.hpp \ src/poller_base.hpp \
src/pollset.cpp \
src/pollset.hpp \
src/precompiled.cpp \ src/precompiled.cpp \
src/precompiled.hpp \ src/precompiled.hpp \
src/proxy.cpp \ src/proxy.cpp \
......
...@@ -794,6 +794,22 @@ kqueue(); ...@@ -794,6 +794,22 @@ kqueue();
) )
}]) }])
dnl ################################################################################
dnl # LIBZMQ_CHECK_POLLER_POLLSET([action-if-found], [action-if-not-found]) #
dnl # Checks pollset polling system #
dnl ################################################################################
AC_DEFUN([LIBZMQ_CHECK_POLLER_POLLSET], [{
AC_LINK_IFELSE([
AC_LANG_PROGRAM([
#include <sys/poll.h>
#include <sys/pollset.h>
],[[
pollset_t ps = pollset_create(-1);
]])],
[$1], [$2]
)
}])
dnl ################################################################################ dnl ################################################################################
dnl # LIBZMQ_CHECK_POLLER_EPOLL_RUN([action-if-found], [action-if-not-found]) # dnl # LIBZMQ_CHECK_POLLER_EPOLL_RUN([action-if-found], [action-if-not-found]) #
dnl # Checks epoll polling system can actually run # dnl # Checks epoll polling system can actually run #
...@@ -892,7 +908,7 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{ ...@@ -892,7 +908,7 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{
# Allow user to override poller autodetection # Allow user to override poller autodetection
AC_ARG_WITH([poller], AC_ARG_WITH([poller],
[AS_HELP_STRING([--with-poller], [AS_HELP_STRING([--with-poller],
[choose polling system manually. Valid values are 'kqueue', 'epoll', 'devpoll', 'poll', 'select', or 'auto'. [default=auto]])]) [choose polling system manually. Valid values are 'kqueue', 'pollset', 'epoll', 'devpoll', 'poll', 'select', or 'auto'. [default=auto]])])
if test "x$with_poller" == "x"; then if test "x$with_poller" == "x"; then
pollers=auto pollers=auto
...@@ -901,7 +917,7 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{ ...@@ -901,7 +917,7 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{
fi fi
if test "$pollers" == "auto"; then if test "$pollers" == "auto"; then
# We search for pollers in this order # We search for pollers in this order
pollers="kqueue epoll devpoll poll select" pollers="kqueue pollset epoll devpoll poll select"
fi fi
# try to find suitable polling system. the order of testing is: # try to find suitable polling system. the order of testing is:
...@@ -916,6 +932,13 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{ ...@@ -916,6 +932,13 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{
poller_found=1 poller_found=1
]) ])
;; ;;
pollset)
LIBZMQ_CHECK_POLLER_POLLSET([
AC_MSG_NOTICE([Using 'pollset' polling system])
AC_DEFINE(ZMQ_USE_POLLSET, 1, [Use 'pollset' polling system])
poller_found=1
])
;;
epoll) epoll)
LIBZMQ_CHECK_POLLER_EPOLL([ LIBZMQ_CHECK_POLLER_EPOLL([
AC_MSG_NOTICE([Using 'epoll' polling system]) AC_MSG_NOTICE([Using 'epoll' polling system])
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "pollset.hpp"
#if defined ZMQ_USE_POLLSET
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <algorithm>
#include <new>
#include "macros.hpp"
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::pollset_t::pollset_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
pollset_fd = pollset_create (-1);
errno_assert (pollset_fd != -1);
}
zmq::pollset_t::~pollset_t ()
{
// Wait till the worker thread exits.
worker.stop ();
close (pollset_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it)
LIBZMQ_DELETE(*it);
}
zmq::pollset_t::handle_t zmq::pollset_t::add_fd (fd_t fd_, i_poll_events *events_)
{
poll_entry_t *pe = new (std::nothrow) poll_entry_t;
alloc_assert (pe);
pe->fd = fd_;
pe->flag_pollin = false;
pe->flag_pollout = false;
pe->events = events_;
struct poll_ctl pc;
pc.fd = fd_;
pc.cmd = PS_ADD;
pc.events = 0;
int rc = pollset_ctl (pollset_fd, &pc, 1);
errno_assert (rc != -1);
// Increase the load metric of the thread.
adjust_load (1);
fd_table.resize(fd_ + 1, NULL);
fd_table [fd_] = pe;
return pe;
}
void zmq::pollset_t::rm_fd (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
struct poll_ctl pc;
pc.fd = pe->fd;
pc.cmd = PS_DELETE;
pc.events = 0;
pollset_ctl (pollset_fd, &pc, 1);
pe->fd = retired_fd;
retired.push_back (pe);
// Decrease the load metric of the thread.
adjust_load (-1);
fd_table [pe->fd] = NULL;
}
void zmq::pollset_t::set_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
if (likely (!pe->flag_pollin)) {
struct poll_ctl pc;
pc.fd = pe->fd;
pc.cmd = PS_MOD;
pc.events = POLLIN;
const int rc = pollset_ctl (pollset_fd, &pc, 1);
errno_assert (rc != -1);
pe->flag_pollin = true;
}
}
void zmq::pollset_t::reset_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
if (unlikely(!pe->flag_pollin)) {
return;
}
struct poll_ctl pc;
pc.fd = pe->fd;
pc.events = 0;
if (pe->flag_pollout) {
pc.cmd = PS_DELETE;
pc.events = POLLOUT;
pollset_ctl(pollset_fd, &pc, 1);
}
pc.cmd = PS_MOD;
int rc = pollset_ctl (pollset_fd, &pc, 1);
errno_assert (rc != -1);
pe->flag_pollin = false;
}
void zmq::pollset_t::set_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
if (likely (!pe->flag_pollout)) {
struct poll_ctl pc;
pc.fd = pe->fd;
pc.cmd = PS_MOD;
pc.events = POLLOUT;
const int rc = pollset_ctl (pollset_fd, &pc, 1);
errno_assert (rc != -1);
pe->flag_pollout = true;
}
}
void zmq::pollset_t::reset_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
if (unlikely(!pe->flag_pollout)) {
return;
}
struct poll_ctl pc;
pc.fd = pe->fd;
pc.events = 0;
if (pe->flag_pollin) {
pc.cmd = PS_DELETE;
pc.events = POLLIN;
pollset_ctl (pollset_fd, &pc, 1);
}
pc.cmd = PS_MOD;
int rc = pollset_ctl (pollset_fd, &pc, 1);
errno_assert (rc != -1);
pe->flag_pollout = false;
}
void zmq::pollset_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::pollset_t::stop ()
{
stopping = true;
}
int zmq::pollset_t::max_fds ()
{
return -1;
}
void zmq::pollset_t::loop ()
{
struct pollfd polldata_array[max_io_events];
while (!stopping) {
// Execute any due timers.
int timeout = (int) execute_timers ();
// Wait for events.
int n = pollset_poll(pollset_fd, polldata_array, max_io_events,
timeout ? timeout : -1);
if (n == -1) {
errno_assert (errno == EINTR);
continue;
}
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = fd_table [polldata_array [i].fd];
if (!pe)
continue;
if (pe->fd == retired_fd)
continue;
if (polldata_array [i].revents & (POLLERR | POLLHUP))
pe->events->in_event ();
if (pe->fd == retired_fd)
continue;
if (polldata_array [i].revents & POLLOUT)
pe->events->out_event ();
if (pe->fd == retired_fd)
continue;
if (polldata_array [i].revents & POLLIN)
pe->events->in_event ();
}
// Destroy retired event sources.
for (retired_t::iterator it = retired.begin (); it != retired.end ();
++it)
LIBZMQ_DELETE(*it);
retired.clear ();
}
}
void zmq::pollset_t::worker_routine (void *arg_)
{
((pollset_t*) arg_)->loop ();
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_POLLSET_HPP_INCLUDED__
#define __ZMQ_POLLSET_HPP_INCLUDED__
// poller.hpp decides which polling mechanism to use.
#include "poller.hpp"
#if defined ZMQ_USE_POLLSET
#include <sys/poll.h>
#include <sys/pollset.h>
#include <vector>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
namespace zmq
{
struct i_poll_events;
// This class implements socket polling mechanism using the AIX-specific
// pollset mechanism.
class pollset_t : public poller_base_t
{
public:
typedef void* handle_t;
pollset_t (const ctx_t &ctx_);
~pollset_t ();
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
static int max_fds ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// Main pollset file descriptor
::pollset_t pollset_fd;
struct poll_entry_t
{
fd_t fd;
bool flag_pollin;
bool flag_pollout;
zmq::i_poll_events *events;
};
// List of retired event sources.
typedef std::vector <poll_entry_t*> retired_t;
retired_t retired;
// This table stores data for registered descriptors.
typedef std::vector <poll_entry_t*> fd_table_t;
fd_table_t fd_table;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
pollset_t (const pollset_t&);
const pollset_t &operator = (const pollset_t&);
};
typedef pollset_t poller_t;
}
#endif
#endif
...@@ -30,14 +30,16 @@ ...@@ -30,14 +30,16 @@
#ifndef __ZMQ_POLLER_HPP_INCLUDED__ #ifndef __ZMQ_POLLER_HPP_INCLUDED__
#define __ZMQ_POLLER_HPP_INCLUDED__ #define __ZMQ_POLLER_HPP_INCLUDED__
#if defined ZMQ_USE_KQUEUE + defined ZMQ_USE_EPOLL \ #if defined ZMQ_USE_KQUEUE + defined ZMQ_USE_POLLSET \
+ defined ZMQ_USE_DEVPOLL + defined ZMQ_USE_POLL \ + defined ZMQ_USE_EPOLL + defined ZMQ_USE_DEVPOLL \
+ defined ZMQ_USE_SELECT > 1 + defined ZMQ_USE_POLL + defined ZMQ_USE_SELECT > 1
#error More than one of the ZMQ_USE_* macros defined #error More than one of the ZMQ_USE_* macros defined
#endif #endif
#if defined ZMQ_USE_KQUEUE #if defined ZMQ_USE_KQUEUE
# include "kqueue.hpp" # include "kqueue.hpp"
#elif defined ZMQ_USE_POLLSET
# include "pollset.hpp"
#elif defined ZMQ_USE_EPOLL #elif defined ZMQ_USE_EPOLL
# include "epoll.hpp" # include "epoll.hpp"
#elif defined ZMQ_USE_DEVPOLL #elif defined ZMQ_USE_DEVPOLL
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment