Commit b64b50ae authored by Martin Sustrik's avatar Martin Sustrik

Timers correctly canceled by PGM engines on shutdown.

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent e288f7a3
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) : const options_t &options_) :
io_object_t (parent_), io_object_t (parent_),
has_rx_timer (false),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
inout (NULL), inout (NULL),
...@@ -81,7 +82,11 @@ void zmq::pgm_receiver_t::unplug () ...@@ -81,7 +82,11 @@ void zmq::pgm_receiver_t::unplug ()
mru_decoder = NULL; mru_decoder = NULL;
pending_bytes = 0; pending_bytes = 0;
// Stop polling. if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
}
rm_fd (socket_handle); rm_fd (socket_handle);
rm_fd (pipe_handle); rm_fd (pipe_handle);
...@@ -150,8 +155,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -150,8 +155,7 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is // No data to process. This may happen if the packet received is
// neither ODATA nor ODATA. // neither ODATA nor ODATA.
if (received == 0) { if (received == 0) {
const int last_errno = errno; if (errno == ENOMEM || errno == EBUSY) {
if (last_errno == ENOMEM || last_errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout (); const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id); add_timer (timeout, rx_timer_id);
has_rx_timer = true; has_rx_timer = true;
......
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) : const options_t &options_) :
io_object_t (parent_), io_object_t (parent_),
has_tx_timer (false),
has_rx_timer (false),
encoder (0), encoder (0),
pgm_socket (false, options_), pgm_socket (false, options_),
options (options_), options (options_),
...@@ -89,6 +91,16 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) ...@@ -89,6 +91,16 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
void zmq::pgm_sender_t::unplug () void zmq::pgm_sender_t::unplug ()
{ {
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
}
if (has_tx_timer) {
cancel_timer (tx_timer_id);
has_tx_timer = false;
}
rm_fd (handle); rm_fd (handle);
rm_fd (uplink_handle); rm_fd (uplink_handle);
rm_fd (rdata_notify_handle); rm_fd (rdata_notify_handle);
...@@ -128,10 +140,9 @@ void zmq::pgm_sender_t::in_event () ...@@ -128,10 +140,9 @@ void zmq::pgm_sender_t::in_event ()
has_rx_timer = false; has_rx_timer = false;
} }
// In event on sender side means NAK or SPMR receiving from some peer. // In-event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream (); pgm_socket.process_upstream ();
const int last_errno = errno; if (errno == ENOMEM || errno == EBUSY) {
if (last_errno == ENOMEM || last_errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout (); const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id); add_timer (timeout, rx_timer_id);
has_rx_timer = true; has_rx_timer = true;
......
...@@ -66,7 +66,8 @@ namespace zmq ...@@ -66,7 +66,8 @@ namespace zmq
enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
// Timers are running. // Timers are running.
bool has_tx_timer, has_rx_timer; bool has_tx_timer;
bool has_rx_timer;
// Message encoder. // Message encoder.
encoder_t encoder; encoder_t encoder;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment