Commit 9d96e003 authored by Martin Sustrik's avatar Martin Sustrik

Clean-up of the code related to attaching/detaching engines to sessions.

Session base class now handles the engine events exclusively. It notifies
derived session types using dedicated "attached" and "detached" events.

Couple of bugs was fixed along the way.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 5ae878b8
...@@ -105,14 +105,13 @@ void zmq::connect_session_t::start_connecting () ...@@ -105,14 +105,13 @@ void zmq::connect_session_t::start_connecting ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::connect_session_t::detached () void zmq::connect_session_t::attached (const blob_t &peer_identity_)
{ {
// Clean up the mess left over by the failed connection. }
clean_pipes ();
void zmq::connect_session_t::detached ()
{
// Reconnect. // Reconnect.
start_connecting (); start_connecting ();
session_t::detached ();
} }
...@@ -41,7 +41,8 @@ namespace zmq ...@@ -41,7 +41,8 @@ namespace zmq
private: private:
// Hook into session's disconnection mechanism. // Handlers for events from session base class.
void attached (const blob_t &peer_identity_);
void detached (); void detached ();
// Start the connection process. // Start the connection process.
......
...@@ -41,14 +41,9 @@ zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, ...@@ -41,14 +41,9 @@ zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
zmq::named_session_t::~named_session_t () zmq::named_session_t::~named_session_t ()
{ {
} // Unregister the session from the global list of named sessions.
if (!name.empty ())
void zmq::named_session_t::detach () unregister_session (name);
{
// Clean up the mess left over by the failed connection.
clean_pipes ();
// Do nothing. Wait till the connection comes up again.
} }
void zmq::named_session_t::attached (const blob_t &peer_identity_) void zmq::named_session_t::attached (const blob_t &peer_identity_)
...@@ -83,7 +78,7 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_) ...@@ -83,7 +78,7 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_)
void zmq::named_session_t::detached () void zmq::named_session_t::detached ()
{ {
unregister_session (name); // Do nothing. Named sessions are never destroyed because of disconnection,
session_t::detached (); // neither they have to actively reconnect.
} }
...@@ -38,10 +38,7 @@ namespace zmq ...@@ -38,10 +38,7 @@ namespace zmq
const blob_t &name_); const blob_t &name_);
~named_session_t (); ~named_session_t ();
// i_inout interface implementation. // Handlers for events from session base class.
void detach ();
// Handle events from session_t base class.
void attached (const blob_t &peer_identity_); void attached (const blob_t &peer_identity_);
void detached (); void detached ();
......
...@@ -260,7 +260,15 @@ void zmq::session_t::detach () ...@@ -260,7 +260,15 @@ void zmq::session_t::detach ()
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
engine = NULL; engine = NULL;
// Remove any half-done messages from the pipes.
clean_pipes ();
// Send the event to the derived class.
detached (); detached ();
// Just in case, there's only a delimiter in the inbound pipe.
if (in_pipe)
in_pipe->check_read ();
} }
void zmq::session_t::process_term () void zmq::session_t::process_term ()
...@@ -293,16 +301,6 @@ void zmq::session_t::unregister_session (const blob_t &name_) ...@@ -293,16 +301,6 @@ void zmq::session_t::unregister_session (const blob_t &name_)
socket->unregister_session (name_); socket->unregister_session (name_);
} }
void zmq::session_t::attached (const blob_t &peer_identity_)
{
}
void zmq::session_t::detached ()
{
if (in_pipe)
in_pipe->check_read ();
}
void zmq::session_t::terminate () void zmq::session_t::terminate ()
{ {
force_terminate = true; force_terminate = true;
......
...@@ -70,8 +70,8 @@ namespace zmq ...@@ -70,8 +70,8 @@ namespace zmq
// when session is attached to a peer, detached is triggered at the // when session is attached to a peer, detached is triggered at the
// beginning of the termination process when session is about to // beginning of the termination process when session is about to
// be detached from the peer. // be detached from the peer.
virtual void attached (const blob_t &peer_identity_); virtual void attached (const blob_t &peer_identity_) = 0;
virtual void detached (); virtual void detached () = 0;
// Allows derives session types to (un)register session names. // Allows derives session types to (un)register session names.
bool register_session (const blob_t &name_, class session_t *session_); bool register_session (const blob_t &name_, class session_t *session_);
...@@ -79,10 +79,6 @@ namespace zmq ...@@ -79,10 +79,6 @@ namespace zmq
~session_t (); ~session_t ();
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
void clean_pipes ();
// Inherited socket options. These are visible to all session classes. // Inherited socket options. These are visible to all session classes.
options_t options; options_t options;
...@@ -94,6 +90,10 @@ namespace zmq ...@@ -94,6 +90,10 @@ namespace zmq
const blob_t &peer_identity_); const blob_t &peer_identity_);
void process_term (); void process_term ();
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
void clean_pipes ();
// Call this function to move on with the delayed process_term. // Call this function to move on with the delayed process_term.
void proceed_with_term (); void proceed_with_term ();
......
...@@ -29,6 +29,10 @@ zmq::transient_session_t::~transient_session_t () ...@@ -29,6 +29,10 @@ zmq::transient_session_t::~transient_session_t ()
{ {
} }
void zmq::transient_session_t::attached (const blob_t &peer_identity_)
{
}
void zmq::transient_session_t::detached () void zmq::transient_session_t::detached ()
{ {
// There's no way to reestablish a transient session. Tear it down. // There's no way to reestablish a transient session. Tear it down.
......
...@@ -38,7 +38,8 @@ namespace zmq ...@@ -38,7 +38,8 @@ namespace zmq
private: private:
// Hook into session's disconnection mechanism. // Handlers for events from session base class.
void attached (const blob_t &peer_identity_);
void detached (); void detached ();
transient_session_t (const transient_session_t&); transient_session_t (const transient_session_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment