Commit 1d239972 authored by Martin Sustrik's avatar Martin Sustrik

zmq_init_t destroyed zmq_engine_t before plugging it out from the poller first -- fixed

parent cda3c96a
...@@ -40,7 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : ...@@ -40,7 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
inout (NULL), inout (NULL),
options (options_) options (options_),
plugged (false)
{ {
// Initialise the underlying socket. // Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
...@@ -49,10 +50,14 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : ...@@ -49,10 +50,14 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
zmq::zmq_engine_t::~zmq_engine_t () zmq::zmq_engine_t::~zmq_engine_t ()
{ {
zmq_assert (!plugged);
} }
void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{ {
zmq_assert (!plugged);
plugged = true;
// Conncet to session/init object. // Conncet to session/init object.
zmq_assert (!inout); zmq_assert (!inout);
zmq_assert (inout_); zmq_assert (inout_);
...@@ -72,6 +77,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) ...@@ -72,6 +77,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
void zmq::zmq_engine_t::unplug () void zmq::zmq_engine_t::unplug ()
{ {
zmq_assert (plugged);
plugged = false;
// Cancel all fd subscriptions. // Cancel all fd subscriptions.
rm_fd (handle); rm_fd (handle);
......
...@@ -39,7 +39,6 @@ namespace zmq ...@@ -39,7 +39,6 @@ namespace zmq
public: public:
zmq_engine_t (fd_t fd_, const options_t &options_); zmq_engine_t (fd_t fd_, const options_t &options_);
~zmq_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
...@@ -54,6 +53,10 @@ namespace zmq ...@@ -54,6 +53,10 @@ namespace zmq
private: private:
// Destructor is not to be used directly.
// Use 'terminate' function instead.
~zmq_engine_t ();
// Function to handle network disconnections. // Function to handle network disconnections.
void error (); void error ();
...@@ -72,6 +75,8 @@ namespace zmq ...@@ -72,6 +75,8 @@ namespace zmq
options_t options; options_t options;
bool plugged;
zmq_engine_t (const zmq_engine_t&); zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&); void operator = (const zmq_engine_t&);
}; };
......
...@@ -49,7 +49,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, ...@@ -49,7 +49,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
zmq::zmq_init_t::~zmq_init_t () zmq::zmq_init_t::~zmq_init_t ()
{ {
if (engine) if (engine)
delete engine; engine->terminate ();
} }
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment