Commit cf815e8c authored by Martin Sustrik's avatar Martin Sustrik

new interfaces for timers; the implementation is not changed yet

parent be79a9fb
...@@ -69,9 +69,13 @@ namespace zmq ...@@ -69,9 +69,13 @@ namespace zmq
// Maximum number of events the I/O thread can process in one go. // Maximum number of events the I/O thread can process in one go.
max_io_events = 256, max_io_events = 256,
// TODO: To be removed
// Maximal wait time for a timer (milliseconds). // Maximal wait time for a timer (milliseconds).
max_timer_period = 100, max_timer_period = 100,
// How long to wait (milliseconds) till reattempting to connect.
reconnect_period = 100,
// Maximal delay to process command in API thread (in CPU ticks). // Maximal delay to process command in API thread (in CPU ticks).
// 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
// Note that delay is only applied when there is continuous stream of // Note that delay is only applied when there is continuous stream of
......
...@@ -128,12 +128,12 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_) ...@@ -128,12 +128,12 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
devpoll_ctl (handle_, fd_table [handle_].events); devpoll_ctl (handle_, fd_table [handle_].events);
} }
void zmq::devpoll_t::add_timer (i_poll_events *events_) void zmq::devpoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{ {
timers.push_back (events_); timers.push_back (events_);
} }
void zmq::devpoll_t::cancel_timer (i_poll_events *events_) void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)
{ {
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ()) if (it != timers.end ())
...@@ -190,7 +190,7 @@ void zmq::devpoll_t::loop () ...@@ -190,7 +190,7 @@ void zmq::devpoll_t::loop ()
// Trigger all the timers. // Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++) for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event (); (*it)->timer_event (-1);
continue; continue;
} }
......
...@@ -33,8 +33,7 @@ ...@@ -33,8 +33,7 @@
namespace zmq namespace zmq
{ {
// Implements socket polling mechanism using the Solaris-specific // Implements socket polling mechanism using the "/dev/poll" interface.
// "/dev/poll" interface.
class devpoll_t class devpoll_t
{ {
...@@ -52,8 +51,8 @@ namespace zmq ...@@ -52,8 +51,8 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (struct i_poll_events *events_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -120,12 +120,12 @@ void zmq::epoll_t::reset_pollout (handle_t handle_) ...@@ -120,12 +120,12 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
errno_assert (rc != -1); errno_assert (rc != -1);
} }
void zmq::epoll_t::add_timer (i_poll_events *events_) void zmq::epoll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{ {
timers.push_back (events_); timers.push_back (events_);
} }
void zmq::epoll_t::cancel_timer (i_poll_events *events_) void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)
{ {
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it == timers.end ()) if (it == timers.end ())
...@@ -175,7 +175,7 @@ void zmq::epoll_t::loop () ...@@ -175,7 +175,7 @@ void zmq::epoll_t::loop ()
// Trigger all the timers. // Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++) for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event (); (*it)->timer_event (-1);
continue; continue;
} }
......
...@@ -53,8 +53,8 @@ namespace zmq ...@@ -53,8 +53,8 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (struct i_poll_events *events_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -37,7 +37,7 @@ namespace zmq ...@@ -37,7 +37,7 @@ namespace zmq
virtual void out_event () = 0; virtual void out_event () = 0;
// Called when timer expires. // Called when timer expires.
virtual void timer_event () = 0; virtual void timer_event (int id_) = 0;
}; };
} }
......
...@@ -80,14 +80,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_) ...@@ -80,14 +80,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_)
poller->reset_pollout (handle_); poller->reset_pollout (handle_);
} }
void zmq::io_object_t::add_timer () void zmq::io_object_t::add_timer (int timeout_, int id_)
{ {
poller->add_timer (this); poller->add_timer (timeout_, this, id_);
} }
void zmq::io_object_t::cancel_timer () void zmq::io_object_t::cancel_timer (int id_)
{ {
poller->cancel_timer (this); poller->cancel_timer (this, id_);
} }
void zmq::io_object_t::in_event () void zmq::io_object_t::in_event ()
...@@ -100,7 +100,7 @@ void zmq::io_object_t::out_event () ...@@ -100,7 +100,7 @@ void zmq::io_object_t::out_event ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::io_object_t::timer_event () void zmq::io_object_t::timer_event (int id_)
{ {
zmq_assert (false); zmq_assert (false);
} }
...@@ -56,13 +56,13 @@ namespace zmq ...@@ -56,13 +56,13 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (); void add_timer (int timout_, int id_);
void cancel_timer (); void cancel_timer (int id_);
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
void timer_event (); void timer_event (int id_);
private: private:
......
...@@ -89,7 +89,7 @@ void zmq::io_thread_t::out_event () ...@@ -89,7 +89,7 @@ void zmq::io_thread_t::out_event ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::io_thread_t::timer_event () void zmq::io_thread_t::timer_event (int id_)
{ {
// No timers here. This function is never called. // No timers here. This function is never called.
zmq_assert (false); zmq_assert (false);
......
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
// i_poll_events implementation. // i_poll_events implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
void timer_event (); void timer_event (int id_);
// Used by io_objects to retrieve the assciated poller object. // Used by io_objects to retrieve the assciated poller object.
poller_t *get_poller (); poller_t *get_poller ();
......
...@@ -132,12 +132,12 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) ...@@ -132,12 +132,12 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
kevent_delete (pe->fd, EVFILT_WRITE); kevent_delete (pe->fd, EVFILT_WRITE);
} }
void zmq::kqueue_t::add_timer (i_poll_events *events_) void zmq::kqueue_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{ {
timers.push_back (events_); timers.push_back (events_);
} }
void zmq::kqueue_t::cancel_timer (i_poll_events *events_) void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)
{ {
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ()) if (it != timers.end ())
...@@ -186,7 +186,7 @@ void zmq::kqueue_t::loop () ...@@ -186,7 +186,7 @@ void zmq::kqueue_t::loop ()
// Trigger all the timers. // Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++) for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event (); (*it)->timer_event (-1);
continue; continue;
} }
......
...@@ -53,8 +53,8 @@ namespace zmq ...@@ -53,8 +53,8 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (struct i_poll_events *events_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -112,12 +112,12 @@ void zmq::poll_t::reset_pollout (handle_t handle_) ...@@ -112,12 +112,12 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
pollset [index].events &= ~((short) POLLOUT); pollset [index].events &= ~((short) POLLOUT);
} }
void zmq::poll_t::add_timer (i_poll_events *events_) void zmq::poll_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{ {
timers.push_back (events_); timers.push_back (events_);
} }
void zmq::poll_t::cancel_timer (i_poll_events *events_) void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)
{ {
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ()) if (it != timers.end ())
...@@ -160,7 +160,7 @@ void zmq::poll_t::loop () ...@@ -160,7 +160,7 @@ void zmq::poll_t::loop ()
// Trigger all the timers. // Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++) for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event (); (*it)->timer_event (-1);
continue; continue;
} }
......
...@@ -58,8 +58,8 @@ namespace zmq ...@@ -58,8 +58,8 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (struct i_poll_events *events_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -136,12 +136,12 @@ void zmq::select_t::reset_pollout (handle_t handle_) ...@@ -136,12 +136,12 @@ void zmq::select_t::reset_pollout (handle_t handle_)
FD_CLR (handle_, &source_set_out); FD_CLR (handle_, &source_set_out);
} }
void zmq::select_t::add_timer (i_poll_events *events_) void zmq::select_t::add_timer (int timeout_, i_poll_events *events_, int id_)
{ {
timers.push_back (events_); timers.push_back (events_);
} }
void zmq::select_t::cancel_timer (i_poll_events *events_) void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)
{ {
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ()) if (it != timers.end ())
...@@ -199,7 +199,7 @@ void zmq::select_t::loop () ...@@ -199,7 +199,7 @@ void zmq::select_t::loop ()
// Trigger all the timers. // Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++) for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event (); (*it)->timer_event (-1);
continue; continue;
} }
......
...@@ -60,8 +60,8 @@ namespace zmq ...@@ -60,8 +60,8 @@ namespace zmq
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (struct i_poll_events *events_); void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_, int id_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -42,7 +42,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, ...@@ -42,7 +42,7 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
zmq::zmq_connecter_t::~zmq_connecter_t () zmq::zmq_connecter_t::~zmq_connecter_t ()
{ {
if (wait) if (wait)
cancel_timer (); cancel_timer (reconnect_timer_id);
if (handle_valid) if (handle_valid)
rm_fd (handle); rm_fd (handle);
} }
...@@ -50,7 +50,7 @@ zmq::zmq_connecter_t::~zmq_connecter_t () ...@@ -50,7 +50,7 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
void zmq::zmq_connecter_t::process_plug () void zmq::zmq_connecter_t::process_plug ()
{ {
if (wait) if (wait)
add_timer (); add_timer (reconnect_period, reconnect_timer_id);
else else
start_connecting (); start_connecting ();
} }
...@@ -73,7 +73,7 @@ void zmq::zmq_connecter_t::out_event () ...@@ -73,7 +73,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) { if (fd == retired_fd) {
tcp_connecter.close (); tcp_connecter.close ();
wait = true; wait = true;
add_timer (); add_timer (reconnect_period, reconnect_timer_id);
return; return;
} }
...@@ -121,5 +121,5 @@ void zmq::zmq_connecter_t::start_connecting () ...@@ -121,5 +121,5 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
wait = true; wait = true;
add_timer (); add_timer (reconnect_period, reconnect_timer_id);
} }
...@@ -42,6 +42,9 @@ namespace zmq ...@@ -42,6 +42,9 @@ namespace zmq
private: private:
// ID of the timer used to delay the reconnection.
enum {reconnect_timer_id = 1};
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment