Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
238640a5
Commit
238640a5
authored
Sep 26, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
timers properly implemented
parent
8d7bf668
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
113 additions
and
193 deletions
+113
-193
config.hpp
src/config.hpp
+0
-4
devpoll.cpp
src/devpoll.cpp
+5
-30
devpoll.hpp
src/devpoll.hpp
+0
-6
epoll.cpp
src/epoll.cpp
+7
-36
epoll.hpp
src/epoll.hpp
+0
-6
kqueue.cpp
src/kqueue.cpp
+6
-34
kqueue.hpp
src/kqueue.hpp
+0
-6
poll.cpp
src/poll.cpp
+7
-26
poll.hpp
src/poll.hpp
+0
-6
poller_base.cpp
src/poller_base.cpp
+51
-1
poller_base.hpp
src/poller_base.hpp
+27
-0
select.cpp
src/select.cpp
+7
-30
select.hpp
src/select.hpp
+0
-6
zmq_connecter.cpp
src/zmq_connecter.cpp
+2
-1
zmq_connecter.hpp
src/zmq_connecter.hpp
+1
-1
No files found.
src/config.hpp
View file @
238640a5
...
...
@@ -69,10 +69,6 @@ namespace zmq
// Maximum number of events the I/O thread can process in one go.
max_io_events
=
256
,
// TODO: To be removed
// Maximal wait time for a timer (milliseconds).
max_timer_period
=
100
,
// How long to wait (milliseconds) till reattempting to connect.
reconnect_period
=
100
,
...
...
src/devpoll.cpp
View file @
238640a5
...
...
@@ -124,18 +124,6 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
devpoll_ctl
(
handle_
,
fd_table
[
handle_
].
events
);
}
void
zmq
::
devpoll_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
events_
,
int
id_
)
{
timers
.
push_back
(
events_
);
}
void
zmq
::
devpoll_t
::
cancel_timer
(
i_poll_events
*
events_
,
int
id_
)
{
timers_t
::
iterator
it
=
std
::
find
(
timers
.
begin
(),
timers
.
end
(),
events_
);
if
(
it
!=
timers
.
end
())
timers
.
erase
(
it
);
}
void
zmq
::
devpoll_t
::
start
()
{
worker
.
start
(
worker_routine
,
this
);
...
...
@@ -161,31 +149,18 @@ void zmq::devpoll_t::loop ()
fd_table
[
pending_list
[
i
]].
accepted
=
true
;
pending_list
.
clear
();
poll_req
.
dp_fds
=
&
ev_buf
[
0
];
poll_req
.
dp_nfds
=
nfds
;
poll_req
.
dp_timeout
=
timers
.
empty
()
?
-
1
:
max_timer_period
;
// Execute any due timers.
uint64_t
timeout
=
execute_timers
();
// Wait for events.
poll_req
.
dp_fds
=
&
ev_buf
[
0
];
poll_req
.
dp_nfds
=
nfds
;
poll_req
.
dp_timeout
=
timout
?
timeout
:
-
1
;
int
n
=
ioctl
(
devpoll_fd
,
DP_POLL
,
&
poll_req
);
if
(
n
==
-
1
&&
errno
==
EINTR
)
continue
;
errno_assert
(
n
!=
-
1
);
// Handle timer.
if
(
!
n
)
{
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t
t
;
std
::
swap
(
timers
,
t
);
// Trigger all the timers.
for
(
timers_t
::
iterator
it
=
t
.
begin
();
it
!=
t
.
end
();
it
++
)
(
*
it
)
->
timer_event
(
-
1
);
continue
;
}
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
fd_entry_t
*
fd_ptr
=
&
fd_table
[
ev_buf
[
i
].
fd
];
...
...
src/devpoll.hpp
View file @
238640a5
...
...
@@ -51,8 +51,6 @@ namespace zmq
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
events_
,
int
id_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
,
int
id_
);
void
start
();
void
stop
();
...
...
@@ -83,10 +81,6 @@ namespace zmq
// Pollset manipulation function.
void
devpoll_ctl
(
fd_t
fd_
,
short
events_
);
// List of all the engines waiting for the timer event.
typedef
std
::
vector
<
struct
i_poll_events
*>
timers_t
;
timers_t
timers
;
// If true, thread is in the process of shutting down.
bool
stopping
;
...
...
src/epoll.cpp
View file @
238640a5
...
...
@@ -117,19 +117,6 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
errno_assert
(
rc
!=
-
1
);
}
void
zmq
::
epoll_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
events_
,
int
id_
)
{
timers
.
push_back
(
events_
);
}
void
zmq
::
epoll_t
::
cancel_timer
(
i_poll_events
*
events_
,
int
id_
)
{
timers_t
::
iterator
it
=
std
::
find
(
timers
.
begin
(),
timers
.
end
(),
events_
);
if
(
it
==
timers
.
end
())
return
;
timers
.
erase
(
it
);
}
void
zmq
::
epoll_t
::
start
()
{
worker
.
start
(
worker_routine
,
this
);
...
...
@@ -146,31 +133,15 @@ void zmq::epoll_t::loop ()
while
(
!
stopping
)
{
// Wait for events.
int
n
;
while
(
true
)
{
n
=
epoll_wait
(
epoll_fd
,
&
ev_buf
[
0
],
max_io_events
,
timers
.
empty
()
?
-
1
:
max_timer_period
);
if
(
!
(
n
==
-
1
&&
errno
==
EINTR
))
{
errno_assert
(
n
!=
-
1
);
break
;
}
}
// Handle timer.
if
(
!
n
)
{
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t
t
;
std
::
swap
(
timers
,
t
);
// Trigger all the timers.
for
(
timers_t
::
iterator
it
=
t
.
begin
();
it
!=
t
.
end
();
it
++
)
(
*
it
)
->
timer_event
(
-
1
);
// Execute any due timers.
uint64_t
timeout
=
execute_timers
();
// Wait for events.
int
n
=
epoll_wait
(
epoll_fd
,
&
ev_buf
[
0
],
max_io_events
,
timeout
?
timeout
:
-
1
);
if
(
n
==
-
1
&&
errno
==
EINTR
)
continue
;
}
errno_assert
(
n
!=
-
1
);
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
poll_entry_t
*
pe
=
((
poll_entry_t
*
)
ev_buf
[
i
].
data
.
ptr
);
...
...
src/epoll.hpp
View file @
238640a5
...
...
@@ -53,8 +53,6 @@ namespace zmq
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
events_
,
int
id_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
,
int
id_
);
void
start
();
void
stop
();
...
...
@@ -80,10 +78,6 @@ namespace zmq
typedef
std
::
vector
<
poll_entry_t
*>
retired_t
;
retired_t
retired
;
// List of all the engines waiting for the timer event.
typedef
std
::
vector
<
struct
i_poll_events
*>
timers_t
;
timers_t
timers
;
// If true, thread is in the process of shutting down.
bool
stopping
;
...
...
src/kqueue.cpp
View file @
238640a5
...
...
@@ -132,18 +132,6 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
kevent_delete
(
pe
->
fd
,
EVFILT_WRITE
);
}
void
zmq
::
kqueue_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
events_
,
int
id_
)
{
timers
.
push_back
(
events_
);
}
void
zmq
::
kqueue_t
::
cancel_timer
(
i_poll_events
*
events_
,
int
id_
)
{
timers_t
::
iterator
it
=
std
::
find
(
timers
.
begin
(),
timers
.
end
(),
events_
);
if
(
it
!=
timers
.
end
())
timers
.
erase
(
it
);
}
void
zmq
::
kqueue_t
::
start
()
{
worker
.
start
(
worker_routine
,
this
);
...
...
@@ -158,34 +146,18 @@ void zmq::kqueue_t::loop ()
{
while
(
!
stopping
)
{
struct
kevent
ev_buf
[
max_io_events
];
// Compute time interval to wait.
timespec
timeout
=
{
max_timer_period
/
1000
,
(
max_timer_period
%
1000
)
*
1000000
};
// Execute any due timers.
uint64_t
timeout
=
execute_timers
();
// Wait for events.
int
n
=
kevent
(
kqueue_fd
,
NULL
,
0
,
&
ev_buf
[
0
],
max_io_events
,
timers
.
empty
()
?
NULL
:
&
timeout
);
struct
kevent
ev_buf
[
max_io_events
];
timespec
ts
=
{
timeout
/
1000
,
(
timeout
%
1000
)
*
1000000
};
int
n
=
kevent
(
kqueue_fd
,
NULL
,
0
,
&
ev_buf
[
0
],
max_io_events
,
timeout
?
&
ts
:
NULL
);
if
(
n
==
-
1
&&
errno
==
EINTR
)
continue
;
errno_assert
(
n
!=
-
1
);
// Handle timer.
if
(
!
n
)
{
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t
t
;
std
::
swap
(
timers
,
t
);
// Trigger all the timers.
for
(
timers_t
::
iterator
it
=
t
.
begin
();
it
!=
t
.
end
();
it
++
)
(
*
it
)
->
timer_event
(
-
1
);
continue
;
}
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
poll_entry_t
*
pe
=
(
poll_entry_t
*
)
ev_buf
[
i
].
udata
;
...
...
src/kqueue.hpp
View file @
238640a5
...
...
@@ -53,8 +53,6 @@ namespace zmq
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
events_
,
int
id_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
,
int
id_
);
void
start
();
void
stop
();
...
...
@@ -87,10 +85,6 @@ namespace zmq
typedef
std
::
vector
<
poll_entry_t
*>
retired_t
;
retired_t
retired
;
// List of all the engines waiting for the timer event.
typedef
std
::
vector
<
struct
i_poll_events
*>
timers_t
;
timers_t
timers
;
// If true, thread is in the process of shutting down.
bool
stopping
;
...
...
src/poll.cpp
View file @
238640a5
...
...
@@ -109,18 +109,6 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
pollset
[
index
].
events
&=
~
((
short
)
POLLOUT
);
}
void
zmq
::
poll_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
events_
,
int
id_
)
{
timers
.
push_back
(
events_
);
}
void
zmq
::
poll_t
::
cancel_timer
(
i_poll_events
*
events_
,
int
id_
)
{
timers_t
::
iterator
it
=
std
::
find
(
timers
.
begin
(),
timers
.
end
(),
events_
);
if
(
it
!=
timers
.
end
())
timers
.
erase
(
it
);
}
void
zmq
::
poll_t
::
start
()
{
worker
.
start
(
worker_routine
,
this
);
...
...
@@ -135,27 +123,20 @@ void zmq::poll_t::loop ()
{
while
(
!
stopping
)
{
// Execute any due timers.
uint64_t
timeout
=
execute_timers
();
// Wait for events.
int
rc
=
poll
(
&
pollset
[
0
],
pollset
.
size
(),
timers
.
empty
()
?
-
1
:
max_timer_period
);
int
rc
=
poll
(
&
pollset
[
0
],
pollset
.
size
(),
timeout
?
timeout
:
-
1
);
if
(
rc
==
-
1
&&
errno
==
EINTR
)
continue
;
errno_assert
(
rc
!=
-
1
);
// Handle timer.
if
(
!
rc
)
{
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t
t
;
std
::
swap
(
timers
,
t
);
// Trigger all the timers.
for
(
timers_t
::
iterator
it
=
t
.
begin
();
it
!=
t
.
end
();
it
++
)
(
*
it
)
->
timer_event
(
-
1
);
// If there are no events (i.e. it's a timeout) there's no point
// in checking the pollset.
if
(
rc
==
0
)
continue
;
}
for
(
pollset_t
::
size_type
i
=
0
;
i
!=
pollset
.
size
();
i
++
)
{
...
...
src/poll.hpp
View file @
238640a5
...
...
@@ -58,8 +58,6 @@ namespace zmq
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
events_
,
int
id_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
,
int
id_
);
void
start
();
void
stop
();
...
...
@@ -87,10 +85,6 @@ namespace zmq
// If true, there's at least one retired event source.
bool
retired
;
// List of all the engines waiting for the timer event.
typedef
std
::
vector
<
struct
i_poll_events
*>
timers_t
;
timers_t
timers
;
// If true, thread is in the process of shutting down.
bool
stopping
;
...
...
src/poller_base.cpp
View file @
238640a5
...
...
@@ -18,6 +18,7 @@
*/
#include "poller_base.hpp"
#include "i_poll_events.hpp"
#include "err.hpp"
zmq
::
poller_base_t
::
poller_base_t
()
...
...
@@ -26,7 +27,7 @@ zmq::poller_base_t::poller_base_t ()
zmq
::
poller_base_t
::~
poller_base_t
()
{
// Make sure there
are no fds registered on
shutdown.
// Make sure there
is no more load on the
shutdown.
zmq_assert
(
get_load
()
==
0
);
}
...
...
@@ -42,3 +43,52 @@ void zmq::poller_base_t::adjust_load (int amount_)
else
if
(
amount_
<
0
)
load
.
sub
(
-
amount_
);
}
void
zmq
::
poller_base_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
sink_
,
int
id_
)
{
uint64_t
expiration
=
clock
.
now_ms
()
+
timeout_
;
timer_info_t
info
=
{
sink_
,
id_
};
timers
.
insert
(
std
::
make_pair
(
expiration
,
info
));
}
void
zmq
::
poller_base_t
::
cancel_timer
(
i_poll_events
*
sink_
,
int
id_
)
{
// Complexity of this operation is O(n). We assume it is rarely used.
for
(
timers_t
::
iterator
it
=
timers
.
begin
();
it
!=
timers
.
end
();
it
++
)
if
(
it
->
second
.
sink
==
sink_
&&
it
->
second
.
id
==
id_
)
{
timers
.
erase
(
it
);
return
;
}
// Timer not found.
zmq_assert
(
false
);
}
uint64_t
zmq
::
poller_base_t
::
execute_timers
()
{
// Get the current time.
uint64_t
current
=
clock
.
now_ms
();
// Execute the timers that are already due.
timers_t
::
iterator
it
=
timers
.
begin
();
while
(
it
!=
timers
.
end
())
{
// If we have to wait to execute the item, same will be true about
// all the following items (multimap is sorted). Thus we can stop
// checking the subsequent timers and return the time to wait for
// the next timer (at least 1ms).
if
(
it
->
first
>
current
)
return
it
->
first
-
current
;
// Trigger the timer.
it
->
second
.
sink
->
timer_event
(
it
->
second
.
id
);
// Remove it from the list of active timers.
timers_t
::
iterator
o
=
it
;
++
it
;
timers
.
erase
(
o
);
}
// There are no more timers.
return
0
;
}
src/poller_base.hpp
View file @
238640a5
...
...
@@ -20,6 +20,9 @@
#ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__
#define __ZMQ_POLLER_BASE_HPP_INCLUDED__
#include <map>
#include "clock.hpp"
#include "atomic_counter.hpp"
namespace
zmq
...
...
@@ -36,13 +39,37 @@ namespace zmq
// invoked from a different thread!
int
get_load
();
// Add a timeout to expire in timeout_ milliseconds. After the
// expiration timer_event on sink_ object will be called with
// argument set to id_.
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
sink_
,
int
id_
);
// Cancel the timer created by sink_ object with ID equal to id_.
void
cancel_timer
(
struct
i_poll_events
*
sink_
,
int
id_
);
protected
:
// Called by individual poller implementations to manage the load.
void
adjust_load
(
int
amount_
);
// Executes any timers that are due. Returns number of milliseconds
// to wait to match the next timer or 0 meaning "no timers".
uint64_t
execute_timers
();
private
:
// Clock instance private to this I/O thread.
clock_t
clock
;
// List of active timers.
struct
timer_info_t
{
struct
i_poll_events
*
sink
;
int
id
;
};
typedef
std
::
multimap
<
uint64_t
,
timer_info_t
>
timers_t
;
timers_t
timers
;
// Load of the poller. Currently the number of file descriptors
// registered.
atomic_counter_t
load
;
...
...
src/select.cpp
View file @
238640a5
...
...
@@ -133,18 +133,6 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR
(
handle_
,
&
source_set_out
);
}
void
zmq
::
select_t
::
add_timer
(
int
timeout_
,
i_poll_events
*
events_
,
int
id_
)
{
timers
.
push_back
(
events_
);
}
void
zmq
::
select_t
::
cancel_timer
(
i_poll_events
*
events_
,
int
id_
)
{
timers_t
::
iterator
it
=
std
::
find
(
timers
.
begin
(),
timers
.
end
(),
events_
);
if
(
it
!=
timers
.
end
())
timers
.
erase
(
it
);
}
void
zmq
::
select_t
::
start
()
{
worker
.
start
(
worker_routine
,
this
);
...
...
@@ -164,14 +152,13 @@ void zmq::select_t::loop ()
memcpy
(
&
writefds
,
&
source_set_out
,
sizeof
source_set_out
);
memcpy
(
&
exceptfds
,
&
source_set_err
,
sizeof
source_set_err
);
// Compute the timout interval. Select is free to overwrite the
// value so we have to compute it each time anew.
timeval
timeout
=
{
max_timer_period
/
1000
,
(
max_timer_period
%
1000
)
*
1000
};
// Execute any due timers.
uint64_t
timeout
=
execute_timers
();
// Wait for events.
struct
timeval
tv
=
{
timeout
/
1000
,
timeout
%
1000
*
1000
};
int
rc
=
select
(
maxfd
+
1
,
&
readfds
,
&
writefds
,
&
exceptfds
,
time
rs
.
empty
()
?
NULL
:
&
timeout
);
time
out
?
&
tv
:
NULL
);
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
...
@@ -181,20 +168,10 @@ void zmq::select_t::loop ()
errno_assert
(
rc
!=
-
1
);
#endif
// Handle timer.
if
(
!
rc
)
{
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t
t
;
std
::
swap
(
timers
,
t
);
// Trigger all the timers.
for
(
timers_t
::
iterator
it
=
t
.
begin
();
it
!=
t
.
end
();
it
++
)
(
*
it
)
->
timer_event
(
-
1
);
// If there are no events (i.e. it's a timeout) there's no point
// in checking the pollset.
if
(
rc
==
0
)
continue
;
}
for
(
fd_set_t
::
size_type
i
=
0
;
i
<
fds
.
size
();
i
++
)
{
if
(
fds
[
i
].
fd
==
retired_fd
)
...
...
src/select.hpp
View file @
238640a5
...
...
@@ -60,8 +60,6 @@ namespace zmq
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
int
timeout_
,
struct
i_poll_events
*
events_
,
int
id_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
,
int
id_
);
void
start
();
void
stop
();
...
...
@@ -98,10 +96,6 @@ namespace zmq
// If true, at least one file descriptor has retired.
bool
retired
;
// List of all the engines waiting for the timer event.
typedef
std
::
vector
<
struct
i_poll_events
*>
timers_t
;
timers_t
timers
;
// If true, thread is shutting down.
bool
stopping
;
...
...
src/zmq_connecter.cpp
View file @
238640a5
...
...
@@ -92,8 +92,9 @@ void zmq::zmq_connecter_t::out_event ()
terminate
();
}
void
zmq
::
zmq_connecter_t
::
timer_event
()
void
zmq
::
zmq_connecter_t
::
timer_event
(
int
id_
)
{
zmq_assert
(
id_
==
reconnect_timer_id
);
wait
=
false
;
start_connecting
();
}
...
...
src/zmq_connecter.hpp
View file @
238640a5
...
...
@@ -51,7 +51,7 @@ namespace zmq
// Handlers for I/O events.
void
in_event
();
void
out_event
();
void
timer_event
();
void
timer_event
(
int
id_
);
// Internal function to start the actual connection establishment.
void
start_connecting
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment