Commit c19470ec authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1675 from somdoron/Timers

Problem: no way to schedule timers with zmq_poll or zmq_poller 
parents 9ce8fe8f aadaf990
......@@ -117,6 +117,7 @@ test_getsockopt_memset
test_setsockopt
test_stream_exceeds_buffer
test_poller
test_timers
tests/test*.log
tests/test*.trs
src/platform.hpp*
......
......@@ -449,6 +449,7 @@ set(cxx-sources
zmq_utils.cpp
decoder_allocators.cpp
socket_poller.cpp
timers.cpp
config.hpp)
set(rc-sources version.rc)
......
......@@ -179,6 +179,8 @@ src_libzmq_la_SOURCES = \
src/tcp_listener.hpp \
src/thread.cpp \
src/thread.hpp \
src/timers.cpp \
src/timers.hpp \
src/tipc_address.cpp \
src/tipc_address.hpp \
src/tipc_connecter.cpp \
......@@ -376,7 +378,8 @@ test_apps = \
tests/test_socketopt_hwm \
tests/test_heartbeats \
tests/test_stream_exceeds_buffer \
tests/test_poller
tests/test_poller \
tests/test_timers
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
......@@ -587,6 +590,9 @@ tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la
tests_test_timers_SOURCES = tests/test_timers.cpp
tests_test_timers_LDADD = src/libzmq.la
if !ON_MINGW
if !ON_CYGWIN
......@@ -729,4 +735,3 @@ dist-hook:
maintainer-clean-local:
-rm -rf $(top_srcdir)/config
......@@ -451,6 +451,21 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif
/******************************************************************************/
/* Scheduling timers */
/******************************************************************************/
typedef void (zmq_timer_fn)(int timer_id, void *arg);
ZMQ_EXPORT void *zmq_timers_new ();
ZMQ_EXPORT int zmq_timers_close (void *timers);
ZMQ_EXPORT int zmq_timers_add (void *timers, size_t interval, zmq_timer_fn handler, void *arg);
ZMQ_EXPORT int zmq_timers_cancel (void *timers, int timer_id);
ZMQ_EXPORT int zmq_timers_set_interval (void *timers, int timer_id, size_t interval);
ZMQ_EXPORT int zmq_timers_reset (void *timers, int timer_id);
ZMQ_EXPORT long zmq_timers_timeout (void *timers);
ZMQ_EXPORT int zmq_timers_execute (void *timers);
/******************************************************************************/
/* Message proxying */
/******************************************************************************/
......@@ -542,4 +557,3 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#endif
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "timers.hpp"
#include "err.hpp"
zmq::timers_t::timers_t () :
tag (0xCAFEDADA),
next_timer_id (0)
{
}
zmq::timers_t::~timers_t ()
{
// Mark the timers as dead
tag = 0xdeadbeef;
}
bool zmq::timers_t::check_tag ()
{
return tag == 0xCAFEDADA;
}
int zmq::timers_t::add (size_t interval_, timers_timer_fn handler_, void* arg_)
{
uint64_t when = clock.now_ms() + interval_;
timer_t timer = {++next_timer_id, interval_, handler_, arg_};
timers.insert (timersmap_t::value_type (when, timer));
return timer.timer_id;
}
int zmq::timers_t::cancel (int timer_id_)
{
cancelled_timers_t::iterator it = cancelled_timers.find (timer_id_);
if (it != cancelled_timers.end ()) {
errno = EINVAL;
return -1;
}
cancelled_timers.insert (timer_id_);
return 0;
}
int zmq::timers_t::set_interval (int timer_id_, size_t interval_)
{
for (timersmap_t::iterator it = timers.begin (); it != timers.end (); ++it) {
if (it->second.timer_id == timer_id_) {
timer_t timer = it->second;
timer.interval = interval_;
uint64_t when = clock.now_ms() + interval_;
timers.erase (it);
timers.insert (timersmap_t::value_type (when, timer));
return 0;
}
}
errno = EINVAL;
return -1;
}
int zmq::timers_t::reset (int timer_id_) {
for (timersmap_t::iterator it = timers.begin (); it != timers.end (); ++it) {
if (it->second.timer_id == timer_id_) {
timer_t timer = it->second;
uint64_t when = clock.now_ms() + timer.interval;
timers.erase (it);
timers.insert (timersmap_t::value_type (when, timer));
return 0;
}
}
errno = EINVAL;
return -1;
}
long zmq::timers_t::timeout ()
{
timersmap_t::iterator it = timers.begin ();
uint64_t now = clock.now_ms();
while (it != timers.end ()) {
cancelled_timers_t::iterator cancelled_it = cancelled_timers.find (it->second.timer_id);
// Live timer, lets return the timeout
if (cancelled_it == cancelled_timers.end ()) {
if (it->first > now)
return it->first - now;
else
return 0;
}
// Let's remove it from the begining of the list
timersmap_t::iterator old = it;
++it;
timers.erase (old);
cancelled_timers.erase (cancelled_it);
}
// Wait forever as no timers are alive
return -1;
}
int zmq::timers_t::execute ()
{
timersmap_t::iterator it = timers.begin ();
uint64_t now = clock.now_ms();
while (it != timers.end ()) {
cancelled_timers_t::iterator cancelled_it = cancelled_timers.find (it->second.timer_id);
// Dead timer, lets remove it and continue
if (cancelled_it != cancelled_timers.end ()) {
timersmap_t::iterator old = it;
++it;
timers.erase (old);
cancelled_timers.erase (cancelled_it);
continue;
}
// Map is ordered, if we have to wait for current timer we can stop.
if (it->first > now)
break;
timer_t timer = it->second;
timer.handler (timer.timer_id, timer.arg);
timersmap_t::iterator old = it;
++it;
timers.erase (old);
timers.insert (timersmap_t::value_type (now + timer.interval, timer));
}
return 0;
}
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_TIMERS_HPP_INCLUDED__
#define __ZMQ_TIMERS_HPP_INCLUDED__
#include <stddef.h>
#include <map>
#include <set>
#include "clock.hpp"
namespace zmq
{
typedef void (timers_timer_fn)(
int timer_id, void *arg);
class timers_t
{
public:
timers_t ();
~timers_t ();
// Add timer to the set, timer repeats forever, or until cancel is called.
// Returns a timer_id that is used to cancel the timer.
// Returns -1 if there was an error.
int add (size_t interval, timers_timer_fn handler, void* arg);
// Set the interval of the timer.
// This method is slow, cancelling exsting and adding a new timer yield better performance.
// Returns 0 on success and -1 on error.
int set_interval (int timer_id, size_t interval);
// Reset the timer.
// This method is slow, cancelling exsting and adding a new timer yield better performance.
// Returns 0 on success and -1 on error.
int reset (int timer_id);
// Cancel a timer.
// Returns 0 on success and -1 on error.
int cancel (int timer_id);
// Returns the time in millisecond until the next timer.
// Returns -1 if no timer is due.
long timeout ();
// Execute timers.
// Return 0 if all succeed and -1 if error.
int execute ();
// Return false if object is not a timers class.
bool check_tag ();
private:
// Used to check whether the object is a timers class.
uint32_t tag;
int next_timer_id;
// Clock instance.
clock_t clock;
typedef struct timer_t {
int timer_id;
size_t interval;
timers_timer_fn *handler;
void *arg;
} timer_t;
typedef std::multimap <uint64_t, timer_t> timersmap_t;
timersmap_t timers;
typedef std::set<int> cancelled_timers_t;
cancelled_timers_t cancelled_timers;
timers_t (const timers_t&);
const timers_t &operator = (const timers_t&);
};
}
#endif
......@@ -77,6 +77,7 @@ struct iovec {
#include "metadata.hpp"
#include "signaler.hpp"
#include "socket_poller.hpp"
#include "timers.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
......@@ -1044,7 +1045,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The poller functionality
void* zmq_poller_new ()
void* zmq_poller_new ()
{
zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
alloc_assert (poller);
......@@ -1130,7 +1131,7 @@ int zmq_poller_remove (void *poller_, void *s_)
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
......@@ -1154,7 +1155,7 @@ int zmq_poller_remove_fd (void *poller_, int fd_)
return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
}
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
{
......@@ -1169,12 +1170,92 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
event->socket = e.socket;
event->fd = e.fd;
event->user_data = e.user_data;
event->user_data = e.user_data;
event->events = e.events;
return rc;
}
// Timers
void *zmq_timers_new ()
{
zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
alloc_assert (timers);
return timers;
}
int zmq_timers_close (void *timers_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
delete ((zmq::timers_t*)timers_);
return 0;
}
int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
}
int zmq_timers_cancel (void *timers_, int timer_id_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->cancel (timer_id_);
}
int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
}
int zmq_timers_reset (void *timers_, int timer_id_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->reset (timer_id_);
}
long zmq_timers_timeout (void *timers_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->timeout ();
}
int zmq_timers_execute (void *timers_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->execute ();
}
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
......
......@@ -69,6 +69,7 @@ set(tests
test_sub_forward_tipc
test_xpub_manual
test_xpub_welcome_msg
test_timers
)
if(NOT WIN32)
list(APPEND tests
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
void _sleep (long timeout_)
{
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
#else
usleep (timeout_ * 1000);
#endif
}
void handler (int timer_id, void* arg)
{
*((bool *)arg) = true;
}
int main (void)
{
setup_test_environment ();
void* timers = zmq_timers_new ();
assert (timers);
bool timer_invoked = false;
int timer_id = zmq_timers_add (timers, 100, handler, &timer_invoked);
assert (timer_id);
// Timer should be invoked yet
int rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
// Wait half the time and check again
_sleep (zmq_timers_timeout (timers) / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
// Wait until the end
_sleep (zmq_timers_timeout (timers));
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (timer_invoked);
timer_invoked = false;
// Wait half the time and check again
long timeout = zmq_timers_timeout (timers);
_sleep (timeout / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
// Reset timer and wait half of the time left
rc = zmq_timers_reset (timers, timer_id);
_sleep (timeout / 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
// Wait until the end
_sleep (zmq_timers_timeout (timers));
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (timer_invoked);
timer_invoked = false;
// reschedule
zmq_timers_set_interval (timers, timer_id, 50);
_sleep (51);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (timer_invoked);
timer_invoked = false;
// cancel timer
timeout = zmq_timers_timeout (timers);
zmq_timers_cancel (timers, timer_id);
_sleep (timeout * 2);
rc = zmq_timers_execute (timers);
assert (rc == 0);
assert (!timer_invoked);
rc = zmq_timers_close (timers);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment