Commit 328c92a0 authored by Martin Sustrik's avatar Martin Sustrik

problem with engine being attached to session while it's being terminated fixed

parent 1d239972
...@@ -47,7 +47,7 @@ namespace zmq ...@@ -47,7 +47,7 @@ namespace zmq
zmq_assert (buf); zmq_assert (buf);
} }
// The destructor doesn't have to be virtual. It is mad virtual // The destructor doesn't have to be virtual. It is made virtual
// just to keep ICC and code checking tools from complaining. // just to keep ICC and code checking tools from complaining.
inline virtual ~encoder_base_t () inline virtual ~encoder_base_t ()
{ {
......
...@@ -36,7 +36,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -36,7 +36,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
attach_processed (false), attach_processed (false),
term_processed (false) term_processed (false),
finalised (false)
{ {
} }
...@@ -123,19 +124,46 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, ...@@ -123,19 +124,46 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
out_pipe->set_event_sink (this); out_pipe->set_event_sink (this);
} }
// If we are already terminating, terminate the pipes straight away.
if (finalised) {
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
}
if (out_pipe) {
register_term_acks (1);
out_pipe->terminate ();
}
return;
}
attach_processed = true; attach_processed = true;
finalise (); finalise ();
} }
void zmq::session_t::terminated (reader_t *pipe_) void zmq::session_t::terminated (reader_t *pipe_)
{ {
zmq_assert (in_pipe == pipe_);
in_pipe = NULL; in_pipe = NULL;
if (finalised) {
unregister_term_ack ();
return;
}
finalise (); finalise ();
} }
void zmq::session_t::terminated (writer_t *pipe_) void zmq::session_t::terminated (writer_t *pipe_)
{ {
zmq_assert (out_pipe == pipe_);
out_pipe = NULL; out_pipe = NULL;
if (finalised) {
unregister_term_ack ();
return;
}
finalise (); finalise ();
} }
...@@ -173,8 +201,10 @@ void zmq::session_t::finalise () ...@@ -173,8 +201,10 @@ void zmq::session_t::finalise ()
// 3. Both pipes have already terminated. Note that inbound pipe // 3. Both pipes have already terminated. Note that inbound pipe
// is terminated after delimiter is read, i.e. all messages // is terminated after delimiter is read, i.e. all messages
// were already sent to the wire. // were already sent to the wire.
if (term_processed && attach_processed && !in_pipe && !out_pipe) if (term_processed && attach_processed && !in_pipe && !out_pipe) {
finalised = true;
own_t::process_term (); own_t::process_term ();
}
} }
void zmq::session_t::process_attach (i_engine *engine_, void zmq::session_t::process_attach (i_engine *engine_,
...@@ -188,6 +218,12 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -188,6 +218,12 @@ void zmq::session_t::process_attach (i_engine *engine_,
return; return;
} }
// If we are already terminating, we destroy the engine straight away.
if (finalised) {
delete engine;
return;
}
// Check whether the required pipes already exist. If not so, we'll // Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object. // create them and bind them to the socket object.
reader_t *socket_reader = NULL; reader_t *socket_reader = NULL;
......
...@@ -123,6 +123,8 @@ namespace zmq ...@@ -123,6 +123,8 @@ namespace zmq
// True if term command was already processed. // True if term command was already processed.
bool term_processed; bool term_processed;
bool finalised;
session_t (const session_t&); session_t (const session_t&);
void operator = (const session_t&); void operator = (const session_t&);
}; };
......
...@@ -39,6 +39,7 @@ namespace zmq ...@@ -39,6 +39,7 @@ namespace zmq
public: public:
zmq_engine_t (fd_t fd_, const options_t &options_); zmq_engine_t (fd_t fd_, const options_t &options_);
~zmq_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
...@@ -53,10 +54,6 @@ namespace zmq ...@@ -53,10 +54,6 @@ namespace zmq
private: private:
// Destructor is not to be used directly.
// Use 'terminate' function instead.
~zmq_engine_t ();
// Function to handle network disconnections. // Function to handle network disconnections.
void error (); void error ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment