Commit 43e34d02 authored by Martin Sustrik's avatar Martin Sustrik

engine leak fixed; pgm compilation fixed

parent 45f83d78
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include "connect_session.hpp" #include "connect_session.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_, class socket_base_t *socket_, const options_t &options_,
...@@ -56,10 +58,10 @@ void zmq::connect_session_t::start_connecting () ...@@ -56,10 +58,10 @@ void zmq::connect_session_t::start_connecting ()
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure. // Both PGM and EPGM transports are using the same infrastructure.
if (addr_type == "pgm" || addr_type == "epgm") { if (protocol == "pgm" || protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used. // For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (addr_type == "epgm"); bool udp_encapsulation = (protocol == "epgm");
// At this point we'll create message pipes to the session straight // At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect' // away. There's no point in delaying it as no concept of 'connect'
...@@ -71,11 +73,8 @@ void zmq::connect_session_t::start_connecting () ...@@ -71,11 +73,8 @@ void zmq::connect_session_t::start_connecting ()
choose_io_thread (options.affinity), options); choose_io_thread (options.affinity), options);
zmq_assert (pgm_sender); zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
if (rc != 0) { zmq_assert (rc == 0);
delete pgm_sender;
return -1;
}
send_attach (this, pgm_sender, blob_t ()); send_attach (this, pgm_sender, blob_t ());
} }
...@@ -86,11 +85,8 @@ void zmq::connect_session_t::start_connecting () ...@@ -86,11 +85,8 @@ void zmq::connect_session_t::start_connecting ()
choose_io_thread (options.affinity), options); choose_io_thread (options.affinity), options);
zmq_assert (pgm_receiver); zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
if (rc != 0) { zmq_assert (rc == 0);
delete pgm_receiver;
return -1;
}
send_attach (this, pgm_receiver, blob_t ()); send_attach (this, pgm_receiver, blob_t ());
} }
......
...@@ -34,6 +34,10 @@ namespace zmq ...@@ -34,6 +34,10 @@ namespace zmq
// Unplug the engine from the session. // Unplug the engine from the session.
virtual void unplug () = 0; virtual void unplug () = 0;
// Terminate and deallocate the engine. Note that 'detached'
// events in not fired on termination.
virtual void terminate () = 0;
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
virtual void activate_in () = 0; virtual void activate_in () = 0;
......
...@@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) ...@@ -55,7 +55,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_receiver_t::plug (i_inout *inout_) void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{ {
// Retrieve PGM fds and start polling. // Retrieve PGM fds and start polling.
int socket_fd; int socket_fd;
...@@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug () ...@@ -88,12 +88,18 @@ void zmq::pgm_receiver_t::unplug ()
inout = NULL; inout = NULL;
} }
void zmq::pgm_receiver_t::revive () void zmq::pgm_receiver_t::terminate ()
{
unplug ();
delete this;
}
void zmq::pgm_receiver_t::activate_out ()
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::pgm_receiver_t::resume_input () void zmq::pgm_receiver_t::activate_in ()
{ {
// It is possible that the most recently used decoder // It is possible that the most recently used decoder
// processed the whole buffer but failed to write // processed the whole buffer but failed to write
......
...@@ -51,10 +51,11 @@ namespace zmq ...@@ -51,10 +51,11 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void terminate ();
void resume_input (); void activate_in ();
void activate_out ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -58,7 +58,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc; return rc;
} }
void zmq::pgm_sender_t::plug (i_inout *inout_) void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0; int downlink_socket_fd = 0;
...@@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug () ...@@ -96,13 +96,19 @@ void zmq::pgm_sender_t::unplug ()
encoder.set_inout (NULL); encoder.set_inout (NULL);
} }
void zmq::pgm_sender_t::revive () void zmq::pgm_sender_t::terminate ()
{
unplug ();
delete this;
}
void zmq::pgm_sender_t::activate_out ()
{ {
set_pollout (handle); set_pollout (handle);
out_event (); out_event ();
} }
void zmq::pgm_sender_t::resume_input () void zmq::pgm_sender_t::activate_in ()
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -49,10 +49,11 @@ namespace zmq ...@@ -49,10 +49,11 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void terminate ();
void resume_input (); void activate_in ();
void activate_out ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -45,6 +45,9 @@ zmq::session_t::~session_t () ...@@ -45,6 +45,9 @@ zmq::session_t::~session_t ()
{ {
zmq_assert (!in_pipe); zmq_assert (!in_pipe);
zmq_assert (!out_pipe); zmq_assert (!out_pipe);
if (engine)
engine->terminate ();
} }
void zmq::session_t::terminate () void zmq::session_t::terminate ()
......
...@@ -44,8 +44,6 @@ ...@@ -44,8 +44,6 @@
#include "err.hpp" #include "err.hpp"
#include "ctx.hpp" #include "ctx.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "pair.hpp" #include "pair.hpp"
#include "pub.hpp" #include "pub.hpp"
......
...@@ -87,6 +87,12 @@ void zmq::zmq_engine_t::unplug () ...@@ -87,6 +87,12 @@ void zmq::zmq_engine_t::unplug ()
inout = NULL; inout = NULL;
} }
void zmq::zmq_engine_t::terminate ()
{
unplug ();
delete this;
}
void zmq::zmq_engine_t::in_event () void zmq::zmq_engine_t::in_event ()
{ {
bool disconnection = false; bool disconnection = false;
......
...@@ -44,6 +44,7 @@ namespace zmq ...@@ -44,6 +44,7 @@ namespace zmq
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug (); void unplug ();
void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment