Commit 563651e0 authored by Simon Giesecke's avatar Simon Giesecke

Problem: stream_engine_t instance may access its fields after it deleted itself

Solution: prevent access to data if the object was deleted
parent 05e400a3
...@@ -50,7 +50,10 @@ struct i_engine ...@@ -50,7 +50,10 @@ struct i_engine
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
virtual void restart_input () = 0; // Returns false if the engine was deleted due to an error.
// TODO it is probably better to change the design such that the engine
// does not delete itself
virtual bool restart_input () = 0;
// This method is called by the session to signalise that there // This method is called by the session to signalise that there
// are messages to send available. // are messages to send available.
......
...@@ -382,7 +382,7 @@ void zmq::norm_engine_t::in_event () ...@@ -382,7 +382,7 @@ void zmq::norm_engine_t::in_event ()
} }
} // zmq::norm_engine_t::in_event() } // zmq::norm_engine_t::in_event()
void zmq::norm_engine_t::restart_input () bool zmq::norm_engine_t::restart_input ()
{ {
// TBD - should we check/assert that zmq_input_ready was false??? // TBD - should we check/assert that zmq_input_ready was false???
zmq_input_ready = true; zmq_input_ready = true;
...@@ -390,6 +390,7 @@ void zmq::norm_engine_t::restart_input () ...@@ -390,6 +390,7 @@ void zmq::norm_engine_t::restart_input ()
if (!msg_ready_list.IsEmpty ()) if (!msg_ready_list.IsEmpty ())
recv_data (NORM_OBJECT_INVALID); recv_data (NORM_OBJECT_INVALID);
return true;
} // end zmq::norm_engine_t::restart_input() } // end zmq::norm_engine_t::restart_input()
void zmq::norm_engine_t::recv_data (NormObjectHandle object) void zmq::norm_engine_t::recv_data (NormObjectHandle object)
......
...@@ -39,7 +39,7 @@ class norm_engine_t : public io_object_t, public i_engine ...@@ -39,7 +39,7 @@ class norm_engine_t : public io_object_t, public i_engine
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
virtual void restart_input (); virtual bool restart_input ();
// This method is called by the session to signalise that there // This method is called by the session to signalise that there
// are messages to send available. // are messages to send available.
......
...@@ -116,7 +116,7 @@ void zmq::pgm_receiver_t::restart_output () ...@@ -116,7 +116,7 @@ void zmq::pgm_receiver_t::restart_output ()
drop_subscriptions (); drop_subscriptions ();
} }
void zmq::pgm_receiver_t::restart_input () bool zmq::pgm_receiver_t::restart_input ()
{ {
zmq_assert (session != NULL); zmq_assert (session != NULL);
zmq_assert (active_tsi != NULL); zmq_assert (active_tsi != NULL);
...@@ -135,7 +135,7 @@ void zmq::pgm_receiver_t::restart_input () ...@@ -135,7 +135,7 @@ void zmq::pgm_receiver_t::restart_input ()
// HWM reached; we will try later. // HWM reached; we will try later.
if (errno == EAGAIN) { if (errno == EAGAIN) {
session->flush (); session->flush ();
return; return true;
} }
// Data error. Delete message decoder, mark the // Data error. Delete message decoder, mark the
// peer as not joined and drop remaining data. // peer as not joined and drop remaining data.
...@@ -151,6 +151,8 @@ void zmq::pgm_receiver_t::restart_input () ...@@ -151,6 +151,8 @@ void zmq::pgm_receiver_t::restart_input ()
active_tsi = NULL; active_tsi = NULL;
in_event (); in_event ();
return true;
} }
const char *zmq::pgm_receiver_t::get_endpoint () const const char *zmq::pgm_receiver_t::get_endpoint () const
......
...@@ -57,7 +57,7 @@ class pgm_receiver_t : public io_object_t, public i_engine ...@@ -57,7 +57,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate (); void terminate ();
void restart_input (); bool restart_input ();
void restart_output (); void restart_output ();
void zap_msg_available () {} void zap_msg_available () {}
const char *get_endpoint () const; const char *get_endpoint () const;
......
...@@ -137,9 +137,10 @@ void zmq::pgm_sender_t::restart_output () ...@@ -137,9 +137,10 @@ void zmq::pgm_sender_t::restart_output ()
out_event (); out_event ();
} }
void zmq::pgm_sender_t::restart_input () bool zmq::pgm_sender_t::restart_input ()
{ {
zmq_assert (false); zmq_assert (false);
return true;
} }
const char *zmq::pgm_sender_t::get_endpoint () const const char *zmq::pgm_sender_t::get_endpoint () const
......
...@@ -56,7 +56,7 @@ class pgm_sender_t : public io_object_t, public i_engine ...@@ -56,7 +56,7 @@ class pgm_sender_t : public io_object_t, public i_engine
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate (); void terminate ();
void restart_input (); bool restart_input ();
void restart_output (); void restart_output ();
void zap_msg_available () {} void zap_msg_available () {}
const char *get_endpoint () const; const char *get_endpoint () const;
......
...@@ -441,7 +441,7 @@ void zmq::stream_engine_t::restart_output () ...@@ -441,7 +441,7 @@ void zmq::stream_engine_t::restart_output ()
out_event (); out_event ();
} }
void zmq::stream_engine_t::restart_input () bool zmq::stream_engine_t::restart_input ()
{ {
zmq_assert (_input_stopped); zmq_assert (_input_stopped);
zmq_assert (_session != NULL); zmq_assert (_session != NULL);
...@@ -451,9 +451,11 @@ void zmq::stream_engine_t::restart_input () ...@@ -451,9 +451,11 @@ void zmq::stream_engine_t::restart_input ()
if (rc == -1) { if (rc == -1) {
if (errno == EAGAIN) if (errno == EAGAIN)
_session->flush (); _session->flush ();
else else {
error (protocol_error); error (protocol_error);
return; return false;
}
return true;
} }
while (_insize > 0) { while (_insize > 0) {
...@@ -471,10 +473,14 @@ void zmq::stream_engine_t::restart_input () ...@@ -471,10 +473,14 @@ void zmq::stream_engine_t::restart_input ()
if (rc == -1 && errno == EAGAIN) if (rc == -1 && errno == EAGAIN)
_session->flush (); _session->flush ();
else if (_io_error) else if (_io_error) {
error (connection_error); error (connection_error);
else if (rc == -1) return false;
} else if (rc == -1) {
error (protocol_error); error (protocol_error);
return false;
}
else { else {
_input_stopped = false; _input_stopped = false;
set_pollin (_handle); set_pollin (_handle);
...@@ -483,6 +489,8 @@ void zmq::stream_engine_t::restart_input () ...@@ -483,6 +489,8 @@ void zmq::stream_engine_t::restart_input ()
// Speculative read. // Speculative read.
in_event (); in_event ();
} }
return true;
} }
bool zmq::stream_engine_t::handshake () bool zmq::stream_engine_t::handshake ()
...@@ -814,7 +822,8 @@ void zmq::stream_engine_t::zap_msg_available () ...@@ -814,7 +822,8 @@ void zmq::stream_engine_t::zap_msg_available ()
return; return;
} }
if (_input_stopped) if (_input_stopped)
restart_input (); if (!restart_input ())
return;
if (_output_stopped) if (_output_stopped)
restart_output (); restart_output ();
} }
......
...@@ -76,7 +76,7 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -76,7 +76,7 @@ class stream_engine_t : public io_object_t, public i_engine
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate (); void terminate ();
void restart_input (); bool restart_input ();
void restart_output (); void restart_output ();
void zap_msg_available (); void zap_msg_available ();
const char *get_endpoint () const; const char *get_endpoint () const;
......
...@@ -537,11 +537,12 @@ void zmq::udp_engine_t::in_event () ...@@ -537,11 +537,12 @@ void zmq::udp_engine_t::in_event ()
_session->flush (); _session->flush ();
} }
void zmq::udp_engine_t::restart_input () bool zmq::udp_engine_t::restart_input ()
{ {
if (!_recv_enabled) if (_recv_enabled) {
return; set_pollin (_handle);
in_event ();
}
set_pollin (_handle); return true;
in_event ();
} }
...@@ -32,7 +32,7 @@ class udp_engine_t : public io_object_t, public i_engine ...@@ -32,7 +32,7 @@ class udp_engine_t : public io_object_t, public i_engine
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
void restart_input (); bool restart_input ();
// This method is called by the session to signalise that there // This method is called by the session to signalise that there
// are messages to send available. // are messages to send available.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment