Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
4efe2366
Commit
4efe2366
authored
Oct 02, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
poller is a concept now rather than virtualised class
parent
4a3b4dad
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
170 additions
and
232 deletions
+170
-232
Makefile.am
src/Makefile.am
+1
-1
devpoll.cpp
src/devpoll.cpp
+18
-23
devpoll.hpp
src/devpoll.hpp
+7
-6
epoll.cpp
src/epoll.cpp
+7
-9
epoll.hpp
src/epoll.hpp
+7
-7
i_poller.hpp
src/i_poller.hpp
+0
-84
io_object.cpp
src/io_object.cpp
+1
-1
io_object.hpp
src/io_object.hpp
+4
-2
io_thread.cpp
src/io_thread.cpp
+2
-39
io_thread.hpp
src/io_thread.hpp
+4
-4
kqueue.cpp
src/kqueue.cpp
+8
-9
kqueue.hpp
src/kqueue.hpp
+7
-6
poll.cpp
src/poll.cpp
+8
-10
poll.hpp
src/poll.hpp
+7
-6
poller.hpp
src/poller.hpp
+68
-0
select.cpp
src/select.cpp
+14
-19
select.hpp
src/select.hpp
+7
-6
No files found.
src/Makefile.am
View file @
4efe2366
...
...
@@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \
ip.hpp
\
i_endpoint.hpp
\
i_engine.hpp
\
i_poller.hpp
\
i_poll_events.hpp
\
i_signaler.hpp
\
kqueue.hpp
\
...
...
@@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp
\
platform.hpp
\
poll.hpp
\
poller.hpp
\
p2p.hpp
\
pub.hpp
\
rep.hpp
\
...
...
src/devpoll.cpp
View file @
4efe2366
...
...
@@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
zmq_assert
(
rc
==
sizeof
pfd
);
}
zmq
::
handle_t
zmq
::
devpoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
reactor_
)
zmq
::
devpoll_t
::
handle_t
zmq
::
devpoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
reactor_
)
{
assert
(
!
fd_table
[
fd_
].
valid
);
...
...
@@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)
// Increase the load metric of the thread.
load
.
add
(
1
);
handle_t
handle
;
handle
.
fd
=
fd_
;
return
handle
;
return
fd_
;
}
void
zmq
::
devpoll_t
::
rm_fd
(
handle_t
handle_
)
{
assert
(
fd_table
[
handle_
.
fd
].
valid
);
assert
(
fd_table
[
handle_
].
valid
);
devpoll_ctl
(
handle_
.
fd
,
POLLREMOVE
);
fd_table
[
handle_
.
fd
].
valid
=
false
;
devpoll_ctl
(
handle_
,
POLLREMOVE
);
fd_table
[
handle_
].
valid
=
false
;
// Decrease the load metric of the thread.
load
.
sub
(
1
);
...
...
@@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
void
zmq
::
devpoll_t
::
set_pollin
(
handle_t
handle_
)
{
fd_t
fd
=
handle_
.
fd
;
devpoll_ctl
(
fd
,
POLLREMOVE
);
fd_table
[
fd
].
events
|=
POLLIN
;
devpoll_ctl
(
fd
,
fd_table
[
fd
].
events
);
devpoll_ctl
(
handle_
,
POLLREMOVE
);
fd_table
[
handle_
].
events
|=
POLLIN
;
devpoll_ctl
(
handle_
,
fd_table
[
handle_
].
events
);
}
void
zmq
::
devpoll_t
::
reset_pollin
(
handle_t
handle_
)
{
fd_t
fd
=
handle_
.
fd
;
devpoll_ctl
(
fd
,
POLLREMOVE
);
fd_table
[
fd
].
events
&=
~
((
short
)
POLLIN
);
devpoll_ctl
(
fd
,
fd_table
[
fd
].
events
);
devpoll_ctl
(
handle_
,
POLLREMOVE
);
fd_table
[
handle_
].
events
&=
~
((
short
)
POLLIN
);
devpoll_ctl
(
handle_
,
fd_table
[
handle_
].
events
);
}
void
zmq
::
devpoll_t
::
set_pollout
(
handle_t
handle_
)
{
fd_t
fd
=
handle_
.
fd
;
devpoll_ctl
(
fd
,
POLLREMOVE
);
fd_table
[
fd
].
events
|=
POLLOUT
;
devpoll_ctl
(
fd
,
fd_table
[
fd
].
events
);
devpoll_ctl
(
handle_
,
POLLREMOVE
);
fd_table
[
handle_
].
events
|=
POLLOUT
;
devpoll_ctl
(
handle_
,
fd_table
[
handle_
].
events
);
}
void
zmq
::
devpoll_t
::
reset_pollout
(
handle_t
handle_
)
{
fd_t
fd
=
handle_
.
fd
;
devpoll_ctl
(
fd
,
POLLREMOVE
);
fd_table
[
fd
].
events
&=
~
((
short
)
POLLOUT
);
devpoll_ctl
(
fd
,
fd_table
[
fd
].
events
);
devpoll_ctl
(
handle_
,
POLLREMOVE
);
fd_table
[
handle_
].
events
&=
~
((
short
)
POLLOUT
);
devpoll_ctl
(
handle_
,
fd_table
[
handle_
].
events
);
}
void
zmq
::
devpoll_t
::
add_timer
(
i_poll_events
*
events_
)
...
...
src/devpoll.hpp
View file @
4efe2366
...
...
@@ -26,7 +26,6 @@
#include <vector>
#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
...
...
@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the Solaris-specific
// "/dev/poll" interface.
class
devpoll_t
:
public
i_poller
class
devpoll_t
{
public
:
typedef
fd_t
handle_t
;
devpoll_t
();
~
devpoll_t
();
//
i_poller implementation
.
handle_t
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
);
//
"poller" concept
.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
i_poll_events
*
events_
);
void
cancel_timer
(
i_poll_events
*
events_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
int
get_load
();
void
start
();
void
stop
();
...
...
src/epoll.cpp
View file @
4efe2366
...
...
@@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t ()
delete
*
it
;
}
zmq
::
handle_t
zmq
::
epoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
zmq
::
epoll_t
::
handle_t
zmq
::
epoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
poll_entry_t
*
pe
=
new
poll_entry_t
;
zmq_assert
(
pe
!=
NULL
);
...
...
@@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load
.
add
(
1
);
handle_t
handle
;
handle
.
ptr
=
pe
;
return
handle
;
return
pe
;
}
void
zmq
::
epoll_t
::
rm_fd
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
int
rc
=
epoll_ctl
(
epoll_fd
,
EPOLL_CTL_DEL
,
pe
->
fd
,
&
pe
->
ev
);
errno_assert
(
rc
!=
-
1
);
pe
->
fd
=
retired_fd
;
...
...
@@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
void
zmq
::
epoll_t
::
set_pollin
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
ev
.
events
|=
EPOLLIN
;
int
rc
=
epoll_ctl
(
epoll_fd
,
EPOLL_CTL_MOD
,
pe
->
fd
,
&
pe
->
ev
);
errno_assert
(
rc
!=
-
1
);
...
...
@@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
void
zmq
::
epoll_t
::
reset_pollin
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
ev
.
events
&=
~
((
short
)
EPOLLIN
);
int
rc
=
epoll_ctl
(
epoll_fd
,
EPOLL_CTL_MOD
,
pe
->
fd
,
&
pe
->
ev
);
errno_assert
(
rc
!=
-
1
);
...
...
@@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
void
zmq
::
epoll_t
::
set_pollout
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
ev
.
events
|=
EPOLLOUT
;
int
rc
=
epoll_ctl
(
epoll_fd
,
EPOLL_CTL_MOD
,
pe
->
fd
,
&
pe
->
ev
);
errno_assert
(
rc
!=
-
1
);
...
...
@@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
void
zmq
::
epoll_t
::
reset_pollout
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
ev
.
events
&=
~
((
short
)
EPOLLOUT
);
int
rc
=
epoll_ctl
(
epoll_fd
,
EPOLL_CTL_MOD
,
pe
->
fd
,
&
pe
->
ev
);
errno_assert
(
rc
!=
-
1
);
...
...
src/epoll.hpp
View file @
4efe2366
...
...
@@ -27,8 +27,6 @@
#include <vector>
#include <sys/epoll.h>
#include "i_poller.hpp"
//#include "i_poll_events.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
...
...
@@ -39,22 +37,24 @@ namespace zmq
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
class
epoll_t
:
public
i_poller
class
epoll_t
{
public
:
typedef
void
*
handle_t
;
epoll_t
();
~
epoll_t
();
//
i_poller implementation
.
handle_t
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
);
//
"poller" concept
.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
i_poll_events
*
events_
);
void
cancel_timer
(
i_poll_events
*
events_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
int
get_load
();
void
start
();
void
stop
();
...
...
src/i_poller.hpp
deleted
100644 → 0
View file @
4a3b4dad
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__
#define __ZMQ_I_POLLER_HPP_INCLUDED__
#include "fd.hpp"
namespace
zmq
{
union
handle_t
{
fd_t
fd
;
void
*
ptr
;
};
// Virtual interface to be used when polling on file descriptors.
struct
i_poller
{
virtual
~
i_poller
()
{};
// Add file descriptor to the polling set. Return handle
// representing the descriptor. 'events' interface will be used
// to invoke callback functions when event occurs.
virtual
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
)
=
0
;
// Remove file descriptor identified by handle from the polling set.
virtual
void
rm_fd
(
handle_t
handle_
)
=
0
;
// Start polling for input from socket.
virtual
void
set_pollin
(
handle_t
handle_
)
=
0
;
// Stop polling for input from socket.
virtual
void
reset_pollin
(
handle_t
handle_
)
=
0
;
// Start polling for availability of the socket for writing.
virtual
void
set_pollout
(
handle_t
handle_
)
=
0
;
// Stop polling for availability of the socket for writing.
virtual
void
reset_pollout
(
handle_t
handle_
)
=
0
;
// Ask to be notified after some time. Actual interval varies between
// 0 and max_timer_period ms. Timer is destroyed once it expires or,
// optionally, when cancel_timer is called.
virtual
void
add_timer
(
struct
i_poll_events
*
events_
)
=
0
;
// Cancel the timer set by add_timer method.
virtual
void
cancel_timer
(
struct
i_poll_events
*
events_
)
=
0
;
// Returns load experienced by the I/O thread. Currently it's number
// of file descriptors handled by the poller, in the future we may
// use a metric taking actual traffic on the individual sockets into
// account.
virtual
int
get_load
()
=
0
;
// Start the execution of the underlying I/O thread.
// This method is called from a foreign thread.
virtual
void
start
()
=
0
;
// Ask underlying I/O thread to stop.
virtual
void
stop
()
=
0
;
};
}
#endif
src/io_object.cpp
View file @
4efe2366
...
...
@@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
poller
=
io_thread_
->
get_poller
();
}
zmq
::
handle_t
zmq
::
io_object_t
::
add_fd
(
fd_t
fd_
)
zmq
::
io_object_t
::
handle_t
zmq
::
io_object_t
::
add_fd
(
fd_t
fd_
)
{
return
poller
->
add_fd
(
fd_
,
this
);
}
...
...
src/io_object.hpp
View file @
4efe2366
...
...
@@ -22,7 +22,7 @@
#include <stddef.h>
#include "
i_
poller.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
namespace
zmq
...
...
@@ -41,6 +41,8 @@ namespace zmq
protected
:
typedef
poller_t
::
handle_t
handle_t
;
// Derived class can init/swap the underlying I/O thread.
// Caution: Remove all the file descriptors from the old I/O thread
// before swapping to the new one!
...
...
@@ -63,7 +65,7 @@ namespace zmq
private
:
struct
i_poller
*
poller
;
poller_t
*
poller
;
io_object_t
(
const
io_object_t
&
);
void
operator
=
(
const
io_object_t
&
);
...
...
src/io_thread.cpp
View file @
4efe2366
...
...
@@ -24,11 +24,6 @@
#include "platform.hpp"
#include "err.hpp"
#include "command.hpp"
#include "epoll.hpp"
#include "poll.hpp"
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
...
...
@@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int
flags_
)
:
object_t
(
dispatcher_
,
thread_slot_
)
{
#if defined ZMQ_FORCE_SELECT
poller
=
new
select_t
;
#elif defined ZMQ_FORCE_POLL
poller
=
new
poll_t
;
#elif defined ZMQ_FORCE_EPOLL
poller
=
new
epoll_t
;
#elif defined ZMQ_FORCE_DEVPOLL
poller
=
new
devpoll_t
;
#elif defined ZMQ_FORCE_KQUEUE
poller
=
new
kqueue_t
;
#elif defined ZMQ_HAVE_LINUX
poller
=
new
epoll_t
;
#elif defined ZMQ_HAVE_WINDOWS
poller
=
new
select_t
;
#elif defined ZMQ_HAVE_FREEBSD
poller
=
new
kqueue_t
;
#elif defined ZMQ_HAVE_OPENBSD
poller
=
new
kqueue_t
;
#elif defined ZMQ_HAVE_SOLARIS
poller
=
new
devpoll_t
;
#elif defined ZMQ_HAVE_OSX
poller
=
new
kqueue_t
;
#elif defined ZMQ_HAVE_QNXNTO
poller
=
new
poll_t
;
#elif defined ZMQ_HAVE_AIX
poller
=
new
poll_t
;
#elif defined ZMQ_HAVE_HPUX
poller
=
new
devpoll_t
;
#elif defined ZMQ_HAVE_OPENVMS
poller
=
new
select_t
;
#else
#error Unsupported platform
#endif
poller
=
new
poller_t
;
zmq_assert
(
poller
);
signaler_handle
=
poller
->
add_fd
(
signaler
.
get_fd
(),
this
);
...
...
@@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event ()
zmq_assert
(
false
);
}
zmq
::
i_poller
*
zmq
::
io_thread_t
::
get_poller
()
zmq
::
poller_t
*
zmq
::
io_thread_t
::
get_poller
()
{
zmq_assert
(
poller
);
return
poller
;
...
...
src/io_thread.hpp
View file @
4efe2366
...
...
@@ -23,7 +23,7 @@
#include <vector>
#include "object.hpp"
#include "
i_
poller.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
#include "fd_signaler.hpp"
...
...
@@ -59,7 +59,7 @@ namespace zmq
void
timer_event
();
// Used by io_objects to retrieve the assciated poller object.
struct
i_poller
*
get_poller
();
poller_t
*
get_poller
();
// Command handlers.
void
process_stop
();
...
...
@@ -74,10 +74,10 @@ namespace zmq
fd_signaler_t
signaler
;
// Handle associated with signaler's file descriptor.
handle_t
signaler_handle
;
poller_t
::
handle_t
signaler_handle
;
// I/O multiplexing is performed using a poller object.
i_poller
*
poller
;
poller_t
*
poller
;
};
}
...
...
src/kqueue.cpp
View file @
4efe2366
...
...
@@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
errno_assert
(
rc
!=
-
1
);
}
zmq
::
handle_t
zmq
::
kqueue_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
reactor_
)
zmq
::
kqueue_t
::
handle_t
zmq
::
kqueue_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
reactor_
)
{
poll_entry_t
*
pe
=
new
poll_entry_t
;
zmq_assert
(
pe
!=
NULL
);
...
...
@@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_)
pe
->
flag_pollout
=
0
;
pe
->
reactor
=
reactor_
;
handle_t
handle
;
handle
.
ptr
=
pe
;
return
handle
;
return
pe
;
}
void
zmq
::
kqueue_t
::
rm_fd
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
if
(
pe
->
flag_pollin
)
kevent_delete
(
pe
->
fd
,
EVFILT_READ
);
if
(
pe
->
flag_pollout
)
...
...
@@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void
zmq
::
kqueue_t
::
set_pollin
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
flag_pollin
=
true
;
kevent_add
(
pe
->
fd
,
EVFILT_READ
,
pe
);
}
void
zmq
::
kqueue_t
::
reset_pollin
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
flag_pollin
=
false
;
kevent_delete
(
pe
->
fd
,
EVFILT_READ
);
}
void
zmq
::
kqueue_t
::
set_pollout
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
flag_pollout
=
true
;
kevent_add
(
pe
->
fd
,
EVFILT_WRITE
,
pe
);
}
void
zmq
::
kqueue_t
::
reset_pollout
(
handle_t
handle_
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
.
ptr
;
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
handle_
;
pe
->
flag_pollout
=
false
;
kevent_delete
(
pe
->
fd
,
EVFILT_WRITE
);
}
...
...
src/kqueue.hpp
View file @
4efe2366
...
...
@@ -26,7 +26,6 @@
#include <vector>
#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
...
...
@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the BSD-specific
// kqueue interface.
class
kqueue_t
:
public
i_poller
class
kqueue_t
{
public
:
typedef
void
*
handle_t
;
kqueue_t
();
~
kqueue_t
();
//
i_poller implementation
.
handle_t
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
);
//
"poller" concept
.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
i_poll_events
*
events_
);
void
cancel_timer
(
i_poll_events
*
events_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
int
get_load
();
void
start
();
void
stop
();
...
...
src/poll.cpp
View file @
4efe2366
...
...
@@ -58,7 +58,7 @@ zmq::poll_t::~poll_t ()
zmq_assert
(
load
.
get
()
==
0
);
}
zmq
::
handle_t
zmq
::
poll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
zmq
::
poll_t
::
handle_t
zmq
::
poll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
pollfd
pfd
=
{
fd_
,
0
,
0
};
pollset
.
push_back
(
pfd
);
...
...
@@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load
.
add
(
1
);
handle_t
handle
;
handle
.
fd
=
fd_
;
return
handle
;
return
fd_
;
}
void
zmq
::
poll_t
::
rm_fd
(
handle_t
handle_
)
{
fd_t
index
=
fd_table
[
handle_
.
fd
].
index
;
fd_t
index
=
fd_table
[
handle_
].
index
;
assert
(
index
!=
retired_fd
);
// Mark the fd as unused.
pollset
[
index
].
fd
=
retired_fd
;
fd_table
[
handle_
.
fd
].
index
=
retired_fd
;
fd_table
[
handle_
].
index
=
retired_fd
;
retired
=
true
;
// Decrease the load metric of the thread.
...
...
@@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_)
void
zmq
::
poll_t
::
set_pollin
(
handle_t
handle_
)
{
int
index
=
fd_table
[
handle_
.
fd
].
index
;
int
index
=
fd_table
[
handle_
].
index
;
pollset
[
index
].
events
|=
POLLIN
;
}
void
zmq
::
poll_t
::
reset_pollin
(
handle_t
handle_
)
{
int
index
=
fd_table
[
handle_
.
fd
].
index
;
int
index
=
fd_table
[
handle_
].
index
;
pollset
[
index
].
events
&=
~
((
short
)
POLLIN
);
}
void
zmq
::
poll_t
::
set_pollout
(
handle_t
handle_
)
{
int
index
=
fd_table
[
handle_
.
fd
].
index
;
int
index
=
fd_table
[
handle_
].
index
;
pollset
[
index
].
events
|=
POLLOUT
;
}
void
zmq
::
poll_t
::
reset_pollout
(
handle_t
handle_
)
{
int
index
=
fd_table
[
handle_
.
fd
].
index
;
int
index
=
fd_table
[
handle_
].
index
;
pollset
[
index
].
events
&=
~
((
short
)
POLLOUT
);
}
...
...
src/poll.hpp
View file @
4efe2366
...
...
@@ -31,7 +31,6 @@
#include <stddef.h>
#include <vector>
#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
...
...
@@ -42,22 +41,24 @@ namespace zmq
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
class
poll_t
:
public
i_poller
class
poll_t
{
public
:
typedef
fd_t
handle_t
;
poll_t
();
~
poll_t
();
//
i_poller implementation
.
handle_t
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
);
//
"poller" concept
.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
i_poll_events
*
events_
);
void
cancel_timer
(
i_poll_events
*
events_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
int
get_load
();
void
start
();
void
stop
();
...
...
src/poller.hpp
0 → 100644
View file @
4efe2366
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_POLLER_HPP_INCLUDED__
#define __ZMQ_POLLER_HPP_INCLUDED__
#include "epoll.hpp"
#include "poll.hpp"
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
namespace
zmq
{
#if defined ZMQ_FORCE_SELECT
typedef
select_t
poller_t
;
#elif defined ZMQ_FORCE_POLL
typedef
poll_t
poller_t
;
#elif defined ZMQ_FORCE_EPOLL
typedef
epoll_t
poller_t
;
#elif defined ZMQ_FORCE_DEVPOLL
typedef
devpoll_t
poller_t
;
#elif defined ZMQ_FORCE_KQUEUE
typedef
kqueue_t
poller_t
;
#elif defined ZMQ_HAVE_LINUX
typedef
epoll_t
poller_t
;
#elif defined ZMQ_HAVE_WINDOWS
typedef
select_t
poller_t
;
#elif defined ZMQ_HAVE_FREEBSD
typedef
kqueue_t
poller_t
;
#elif defined ZMQ_HAVE_OPENBSD
typedef
kqueue_t
poller_t
;
#elif defined ZMQ_HAVE_SOLARIS
typedef
devpoll_t
poller_t
;
#elif defined ZMQ_HAVE_OSX
typedef
kqueue_t
poller_t
;
#elif defined ZMQ_HAVE_QNXNTO
typedef
poll_t
poller_t
;
#elif defined ZMQ_HAVE_AIX
typedef
poll_t
poller_t
;
#elif defined ZMQ_HAVE_HPUX
typedef
devpoll_t
poller_t
;
#elif defined ZMQ_HAVE_OPENVMS
typedef
select_t
poller_t
;
#else
#error Unsupported platform
#endif
}
#endif
src/select.cpp
View file @
4efe2366
...
...
@@ -59,7 +59,7 @@ zmq::select_t::~select_t ()
zmq_assert
(
load
.
get
()
==
0
);
}
zmq
::
handle_t
zmq
::
select_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
zmq
::
select_t
::
handle_t
zmq
::
select_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
// Store the file descriptor.
fd_entry_t
entry
=
{
fd_
,
events_
};
...
...
@@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load
.
add
(
1
);
handle_t
handle
;
handle
.
fd
=
fd_
;
return
handle
;
return
fd_
;
}
void
zmq
::
select_t
::
rm_fd
(
handle_t
handle_
)
{
// Get file descriptor.
fd_t
fd
=
handle_
.
fd
;
// Mark the descriptor as retired.
fd_set_t
::
iterator
it
;
for
(
it
=
fds
.
begin
();
it
!=
fds
.
end
();
it
++
)
if
(
it
->
fd
==
fd
)
if
(
it
->
fd
==
handle_
)
break
;
zmq_assert
(
it
!=
fds
.
end
());
it
->
fd
=
retired_fd
;
retired
=
true
;
// Stop polling on the descriptor.
FD_CLR
(
fd
,
&
source_set_in
);
FD_CLR
(
fd
,
&
source_set_out
);
FD_CLR
(
fd
,
&
source_set_err
);
FD_CLR
(
handle_
,
&
source_set_in
);
FD_CLR
(
handle_
,
&
source_set_out
);
FD_CLR
(
handle_
,
&
source_set_err
);
// Discard all events generated on this file descriptor.
FD_CLR
(
fd
,
&
readfds
);
FD_CLR
(
fd
,
&
writefds
);
FD_CLR
(
fd
,
&
exceptfds
);
FD_CLR
(
handle_
,
&
readfds
);
FD_CLR
(
handle_
,
&
writefds
);
FD_CLR
(
handle_
,
&
exceptfds
);
// Adjust the maxfd attribute if we have removed the
// highest-numbered file descriptor.
if
(
fd
==
maxfd
)
{
if
(
handle_
==
maxfd
)
{
maxfd
=
retired_fd
;
for
(
fd_set_t
::
iterator
it
=
fds
.
begin
();
it
!=
fds
.
end
();
it
++
)
if
(
it
->
fd
>
maxfd
)
...
...
@@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_)
void
zmq
::
select_t
::
set_pollin
(
handle_t
handle_
)
{
FD_SET
(
handle_
.
fd
,
&
source_set_in
);
FD_SET
(
handle_
,
&
source_set_in
);
}
void
zmq
::
select_t
::
reset_pollin
(
handle_t
handle_
)
{
FD_CLR
(
handle_
.
fd
,
&
source_set_in
);
FD_CLR
(
handle_
,
&
source_set_in
);
}
void
zmq
::
select_t
::
set_pollout
(
handle_t
handle_
)
{
FD_SET
(
handle_
.
fd
,
&
source_set_out
);
FD_SET
(
handle_
,
&
source_set_out
);
}
void
zmq
::
select_t
::
reset_pollout
(
handle_t
handle_
)
{
FD_CLR
(
handle_
.
fd
,
&
source_set_out
);
FD_CLR
(
handle_
,
&
source_set_out
);
}
void
zmq
::
select_t
::
add_timer
(
i_poll_events
*
events_
)
...
...
src/select.hpp
View file @
4efe2366
...
...
@@ -34,7 +34,6 @@
#include <sys/select.h>
#endif
#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
...
...
@@ -45,22 +44,24 @@ namespace zmq
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
class
select_t
:
public
i_poller
class
select_t
{
public
:
typedef
fd_t
handle_t
;
select_t
();
~
select_t
();
//
i_poller implementation
.
handle_t
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
);
//
"poller" concept
.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
i_poll_events
*
events_
);
void
cancel_timer
(
i_poll_events
*
events_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
int
get_load
();
void
start
();
void
stop
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment