Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
aadaf990
Commit
aadaf990
authored
Dec 18, 2015
by
somdoron
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add timers API to libzmq
parent
819a879f
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
513 additions
and
7 deletions
+513
-7
.gitignore
.gitignore
+1
-0
CMakeLists.txt
CMakeLists.txt
+1
-0
Makefile.am
Makefile.am
+7
-2
zmq.h
include/zmq.h
+15
-1
timers.cpp
src/timers.cpp
+169
-0
timers.hpp
src/timers.hpp
+108
-0
zmq.cpp
src/zmq.cpp
+85
-4
CMakeLists.txt
tests/CMakeLists.txt
+1
-0
test_timers.cpp
tests/test_timers.cpp
+126
-0
No files found.
.gitignore
View file @
aadaf990
...
...
@@ -117,6 +117,7 @@ test_getsockopt_memset
test_setsockopt
test_stream_exceeds_buffer
test_poller
test_timers
tests/test*.log
tests/test*.trs
src/platform.hpp*
...
...
CMakeLists.txt
View file @
aadaf990
...
...
@@ -449,6 +449,7 @@ set(cxx-sources
zmq_utils.cpp
decoder_allocators.cpp
socket_poller.cpp
timers.cpp
config.hpp
)
set
(
rc-sources version.rc
)
...
...
Makefile.am
View file @
aadaf990
...
...
@@ -179,6 +179,8 @@ src_libzmq_la_SOURCES = \
src/tcp_listener.hpp
\
src/thread.cpp
\
src/thread.hpp
\
src/timers.cpp
\
src/timers.hpp
\
src/tipc_address.cpp
\
src/tipc_address.hpp
\
src/tipc_connecter.cpp
\
...
...
@@ -376,7 +378,8 @@ test_apps = \
tests/test_socketopt_hwm
\
tests/test_heartbeats
\
tests/test_stream_exceeds_buffer
\
tests/test_poller
tests/test_poller
\
tests/test_timers
tests_test_system_SOURCES
=
tests/test_system.cpp
tests_test_system_LDADD
=
src/libzmq.la
...
...
@@ -587,6 +590,9 @@ tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
tests_test_poller_SOURCES
=
tests/test_poller.cpp
tests_test_poller_LDADD
=
src/libzmq.la
tests_test_timers_SOURCES
=
tests/test_timers.cpp
tests_test_timers_LDADD
=
src/libzmq.la
if
!ON_MINGW
if
!ON_CYGWIN
...
...
@@ -729,4 +735,3 @@ dist-hook:
maintainer-clean-local
:
-
rm
-rf
$(top_srcdir)
/config
include/zmq.h
View file @
aadaf990
...
...
@@ -451,6 +451,21 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events);
ZMQ_EXPORT
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
#endif
/******************************************************************************/
/* Scheduling timers */
/******************************************************************************/
typedef
void
(
zmq_timer_fn
)(
int
timer_id
,
void
*
arg
);
ZMQ_EXPORT
void
*
zmq_timers_new
();
ZMQ_EXPORT
int
zmq_timers_close
(
void
*
timers
);
ZMQ_EXPORT
int
zmq_timers_add
(
void
*
timers
,
size_t
interval
,
zmq_timer_fn
handler
,
void
*
arg
);
ZMQ_EXPORT
int
zmq_timers_cancel
(
void
*
timers
,
int
timer_id
);
ZMQ_EXPORT
int
zmq_timers_set_interval
(
void
*
timers
,
int
timer_id
,
size_t
interval
);
ZMQ_EXPORT
int
zmq_timers_reset
(
void
*
timers
,
int
timer_id
);
ZMQ_EXPORT
long
zmq_timers_timeout
(
void
*
timers
);
ZMQ_EXPORT
int
zmq_timers_execute
(
void
*
timers
);
/******************************************************************************/
/* Message proxying */
/******************************************************************************/
...
...
@@ -542,4 +557,3 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#endif
#endif
src/timers.cpp
0 → 100644
View file @
aadaf990
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "timers.hpp"
#include "err.hpp"
zmq
::
timers_t
::
timers_t
()
:
tag
(
0xCAFEDADA
),
next_timer_id
(
0
)
{
}
zmq
::
timers_t
::~
timers_t
()
{
// Mark the timers as dead
tag
=
0xdeadbeef
;
}
bool
zmq
::
timers_t
::
check_tag
()
{
return
tag
==
0xCAFEDADA
;
}
int
zmq
::
timers_t
::
add
(
size_t
interval_
,
timers_timer_fn
handler_
,
void
*
arg_
)
{
uint64_t
when
=
clock
.
now_ms
()
+
interval_
;
timer_t
timer
=
{
++
next_timer_id
,
interval_
,
handler_
,
arg_
};
timers
.
insert
(
timersmap_t
::
value_type
(
when
,
timer
));
return
timer
.
timer_id
;
}
int
zmq
::
timers_t
::
cancel
(
int
timer_id_
)
{
cancelled_timers_t
::
iterator
it
=
cancelled_timers
.
find
(
timer_id_
);
if
(
it
!=
cancelled_timers
.
end
())
{
errno
=
EINVAL
;
return
-
1
;
}
cancelled_timers
.
insert
(
timer_id_
);
return
0
;
}
int
zmq
::
timers_t
::
set_interval
(
int
timer_id_
,
size_t
interval_
)
{
for
(
timersmap_t
::
iterator
it
=
timers
.
begin
();
it
!=
timers
.
end
();
++
it
)
{
if
(
it
->
second
.
timer_id
==
timer_id_
)
{
timer_t
timer
=
it
->
second
;
timer
.
interval
=
interval_
;
uint64_t
when
=
clock
.
now_ms
()
+
interval_
;
timers
.
erase
(
it
);
timers
.
insert
(
timersmap_t
::
value_type
(
when
,
timer
));
return
0
;
}
}
errno
=
EINVAL
;
return
-
1
;
}
int
zmq
::
timers_t
::
reset
(
int
timer_id_
)
{
for
(
timersmap_t
::
iterator
it
=
timers
.
begin
();
it
!=
timers
.
end
();
++
it
)
{
if
(
it
->
second
.
timer_id
==
timer_id_
)
{
timer_t
timer
=
it
->
second
;
uint64_t
when
=
clock
.
now_ms
()
+
timer
.
interval
;
timers
.
erase
(
it
);
timers
.
insert
(
timersmap_t
::
value_type
(
when
,
timer
));
return
0
;
}
}
errno
=
EINVAL
;
return
-
1
;
}
long
zmq
::
timers_t
::
timeout
()
{
timersmap_t
::
iterator
it
=
timers
.
begin
();
uint64_t
now
=
clock
.
now_ms
();
while
(
it
!=
timers
.
end
())
{
cancelled_timers_t
::
iterator
cancelled_it
=
cancelled_timers
.
find
(
it
->
second
.
timer_id
);
// Live timer, lets return the timeout
if
(
cancelled_it
==
cancelled_timers
.
end
())
{
if
(
it
->
first
>
now
)
return
it
->
first
-
now
;
else
return
0
;
}
// Let's remove it from the begining of the list
timersmap_t
::
iterator
old
=
it
;
++
it
;
timers
.
erase
(
old
);
cancelled_timers
.
erase
(
cancelled_it
);
}
// Wait forever as no timers are alive
return
-
1
;
}
int
zmq
::
timers_t
::
execute
()
{
timersmap_t
::
iterator
it
=
timers
.
begin
();
uint64_t
now
=
clock
.
now_ms
();
while
(
it
!=
timers
.
end
())
{
cancelled_timers_t
::
iterator
cancelled_it
=
cancelled_timers
.
find
(
it
->
second
.
timer_id
);
// Dead timer, lets remove it and continue
if
(
cancelled_it
!=
cancelled_timers
.
end
())
{
timersmap_t
::
iterator
old
=
it
;
++
it
;
timers
.
erase
(
old
);
cancelled_timers
.
erase
(
cancelled_it
);
continue
;
}
// Map is ordered, if we have to wait for current timer we can stop.
if
(
it
->
first
>
now
)
break
;
timer_t
timer
=
it
->
second
;
timer
.
handler
(
timer
.
timer_id
,
timer
.
arg
);
timersmap_t
::
iterator
old
=
it
;
++
it
;
timers
.
erase
(
old
);
timers
.
insert
(
timersmap_t
::
value_type
(
now
+
timer
.
interval
,
timer
));
}
return
0
;
}
src/timers.hpp
0 → 100644
View file @
aadaf990
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_TIMERS_HPP_INCLUDED__
#define __ZMQ_TIMERS_HPP_INCLUDED__
#include <stddef.h>
#include <map>
#include <set>
#include "clock.hpp"
namespace
zmq
{
typedef
void
(
timers_timer_fn
)(
int
timer_id
,
void
*
arg
);
class
timers_t
{
public
:
timers_t
();
~
timers_t
();
// Add timer to the set, timer repeats forever, or until cancel is called.
// Returns a timer_id that is used to cancel the timer.
// Returns -1 if there was an error.
int
add
(
size_t
interval
,
timers_timer_fn
handler
,
void
*
arg
);
// Set the interval of the timer.
// This method is slow, cancelling exsting and adding a new timer yield better performance.
// Returns 0 on success and -1 on error.
int
set_interval
(
int
timer_id
,
size_t
interval
);
// Reset the timer.
// This method is slow, cancelling exsting and adding a new timer yield better performance.
// Returns 0 on success and -1 on error.
int
reset
(
int
timer_id
);
// Cancel a timer.
// Returns 0 on success and -1 on error.
int
cancel
(
int
timer_id
);
// Returns the time in millisecond until the next timer.
// Returns -1 if no timer is due.
long
timeout
();
// Execute timers.
// Return 0 if all succeed and -1 if error.
int
execute
();
// Return false if object is not a timers class.
bool
check_tag
();
private
:
// Used to check whether the object is a timers class.
uint32_t
tag
;
int
next_timer_id
;
// Clock instance.
clock_t
clock
;
typedef
struct
timer_t
{
int
timer_id
;
size_t
interval
;
timers_timer_fn
*
handler
;
void
*
arg
;
}
timer_t
;
typedef
std
::
multimap
<
uint64_t
,
timer_t
>
timersmap_t
;
timersmap_t
timers
;
typedef
std
::
set
<
int
>
cancelled_timers_t
;
cancelled_timers_t
cancelled_timers
;
timers_t
(
const
timers_t
&
);
const
timers_t
&
operator
=
(
const
timers_t
&
);
};
}
#endif
src/zmq.cpp
View file @
aadaf990
...
...
@@ -77,6 +77,7 @@ struct iovec {
#include "metadata.hpp"
#include "signaler.hpp"
#include "socket_poller.hpp"
#include "timers.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
...
...
@@ -1044,7 +1045,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The poller functionality
void
*
zmq_poller_new
()
void
*
zmq_poller_new
()
{
zmq
::
socket_poller_t
*
poller
=
new
(
std
::
nothrow
)
zmq
::
socket_poller_t
;
alloc_assert
(
poller
);
...
...
@@ -1130,7 +1131,7 @@ int zmq_poller_remove (void *poller_, void *s_)
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
}
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
...
...
@@ -1154,7 +1155,7 @@ int zmq_poller_remove_fd (void *poller_, int fd_)
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
remove_fd
(
fd_
);
}
int
zmq_poller_wait
(
void
*
poller_
,
zmq_poller_event_t
*
event
,
long
timeout_
)
{
...
...
@@ -1169,12 +1170,92 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
event
->
socket
=
e
.
socket
;
event
->
fd
=
e
.
fd
;
event
->
user_data
=
e
.
user_data
;
event
->
user_data
=
e
.
user_data
;
event
->
events
=
e
.
events
;
return
rc
;
}
// Timers
void
*
zmq_timers_new
()
{
zmq
::
timers_t
*
timers
=
new
(
std
::
nothrow
)
zmq
::
timers_t
;
alloc_assert
(
timers
);
return
timers
;
}
int
zmq_timers_close
(
void
*
timers_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
delete
((
zmq
::
timers_t
*
)
timers_
);
return
0
;
}
int
zmq_timers_add
(
void
*
timers_
,
size_t
interval_
,
zmq_timer_fn
handler_
,
void
*
arg_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
add
(
interval_
,
handler_
,
arg_
);
}
int
zmq_timers_cancel
(
void
*
timers_
,
int
timer_id_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
cancel
(
timer_id_
);
}
int
zmq_timers_set_interval
(
void
*
timers_
,
int
timer_id_
,
size_t
interval_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
set_interval
(
timer_id_
,
interval_
);
}
int
zmq_timers_reset
(
void
*
timers_
,
int
timer_id_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
reset
(
timer_id_
);
}
long
zmq_timers_timeout
(
void
*
timers_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
timeout
();
}
int
zmq_timers_execute
(
void
*
timers_
)
{
if
(
!
timers_
||
!
((
zmq
::
timers_t
*
)
timers_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
timers_t
*
)
timers_
)
->
execute
();
}
// The proxy functionality
int
zmq_proxy
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
)
...
...
tests/CMakeLists.txt
View file @
aadaf990
...
...
@@ -69,6 +69,7 @@ set(tests
test_sub_forward_tipc
test_xpub_manual
test_xpub_welcome_msg
test_timers
)
if
(
NOT WIN32
)
list
(
APPEND tests
...
...
tests/test_timers.cpp
0 → 100644
View file @
aadaf990
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
void
_sleep
(
long
timeout_
)
{
#if defined ZMQ_HAVE_WINDOWS
Sleep
(
timeout_
>
0
?
timeout_
:
INFINITE
);
#elif defined ZMQ_HAVE_ANDROID
usleep
(
timeout_
*
1000
);
#else
usleep
(
timeout_
*
1000
);
#endif
}
void
handler
(
int
timer_id
,
void
*
arg
)
{
*
((
bool
*
)
arg
)
=
true
;
}
int
main
(
void
)
{
setup_test_environment
();
void
*
timers
=
zmq_timers_new
();
assert
(
timers
);
bool
timer_invoked
=
false
;
int
timer_id
=
zmq_timers_add
(
timers
,
100
,
handler
,
&
timer_invoked
);
assert
(
timer_id
);
// Timer should be invoked yet
int
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
!
timer_invoked
);
// Wait half the time and check again
_sleep
(
zmq_timers_timeout
(
timers
)
/
2
);
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
!
timer_invoked
);
// Wait until the end
_sleep
(
zmq_timers_timeout
(
timers
));
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
timer_invoked
);
timer_invoked
=
false
;
// Wait half the time and check again
long
timeout
=
zmq_timers_timeout
(
timers
);
_sleep
(
timeout
/
2
);
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
!
timer_invoked
);
// Reset timer and wait half of the time left
rc
=
zmq_timers_reset
(
timers
,
timer_id
);
_sleep
(
timeout
/
2
);
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
!
timer_invoked
);
// Wait until the end
_sleep
(
zmq_timers_timeout
(
timers
));
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
timer_invoked
);
timer_invoked
=
false
;
// reschedule
zmq_timers_set_interval
(
timers
,
timer_id
,
50
);
_sleep
(
51
);
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
timer_invoked
);
timer_invoked
=
false
;
// cancel timer
timeout
=
zmq_timers_timeout
(
timers
);
zmq_timers_cancel
(
timers
,
timer_id
);
_sleep
(
timeout
*
2
);
rc
=
zmq_timers_execute
(
timers
);
assert
(
rc
==
0
);
assert
(
!
timer_invoked
);
rc
=
zmq_timers_close
(
timers
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment