Commit 9b8993ef authored by Martin Sustrik's avatar Martin Sustrik

elementary fixes to the named session

parent 46d70555
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "named_session.hpp" #include "named_session.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
/*
zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const blob_t &name_) : const blob_t &name_) :
...@@ -46,31 +45,33 @@ zmq::named_session_t::~named_session_t () ...@@ -46,31 +45,33 @@ zmq::named_session_t::~named_session_t ()
void zmq::named_session_t::detach () void zmq::named_session_t::detach ()
{ {
// TODO: // Clean up the mess left over by the failed connection.
zmq_assert (false); clean_pipes ();
// Do nothing. Wait till the connection comes up again.
} }
void zmq::named_session_t::attached (const blob_t &peer_identity_) void zmq::named_session_t::attached (const blob_t &peer_identity_)
{ {
if (!peer_identity.empty ()) { if (!name.empty ()) {
// If both IDs are temporary, no checking is needed. // If both IDs are temporary, no checking is needed.
// TODO: Old ID should be reused in this case... // TODO: Old ID should be reused in this case...
if (peer_identity.empty () || peer_identity [0] != 0 || if (name.empty () || name [0] != 0 ||
peer_identity_.empty () || peer_identity_ [0] != 0) { peer_identity_.empty () || peer_identity_ [0] != 0) {
// If we already know the peer name do nothing, just check whether // If we already know the peer name do nothing, just check whether
// it haven't changed. // it haven't changed.
zmq_assert (peer_identity == peer_identity_); zmq_assert (name == peer_identity_);
} }
} }
else if (!peer_identity_.empty ()) { else if (!peer_identity_.empty ()) {
// Store the peer identity. // Store the peer identity.
peer_identity = peer_identity_; name = peer_identity_;
// Register the session using the peer name. // Register the session using the peer name.
if (!register_session (peer_identity, this)) { if (!register_session (name, this)) {
// TODO: There's already a session with the specified // TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the // identity. We should presumably syslog it and drop the
...@@ -82,6 +83,6 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_) ...@@ -82,6 +83,6 @@ void zmq::named_session_t::attached (const blob_t &peer_identity_)
void zmq::named_session_t::detached () void zmq::named_session_t::detached ()
{ {
socket->unregister_session (peer_identity); unregister_session (name);
} }
*/
...@@ -163,31 +163,6 @@ void zmq::session_t::process_plug () ...@@ -163,31 +163,6 @@ void zmq::session_t::process_plug ()
{ {
} }
void zmq::session_t::process_unplug ()
{
// TODO: There may be a problem here. The called ensures that all the
// commands on the fly have been delivered. However, given that the
// session is unregistered from the global repository only at this point
// there may be some commands being sent to the session right now.
// Unregister the session from the socket.
// if (!peer_identity.empty () && peer_identity [0] != 0)
// unregister_session (peer_identity);
// TODO: Should be done in named session.
// Ask associated pipes to terminate.
if (in_pipe)
in_pipe->terminate ();
if (out_pipe)
out_pipe->terminate ();
if (engine) {
engine->unplug ();
delete engine;
engine = NULL;
}
}
void zmq::session_t::finalise () void zmq::session_t::finalise ()
{ {
// If all conditions are met, proceed with termination: // If all conditions are met, proceed with termination:
...@@ -221,7 +196,7 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -221,7 +196,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
} }
if (socket_reader || socket_writer) if (socket_reader || socket_writer)
send_bind (socket, socket_reader, socket_writer, peer_identity); send_bind (socket, socket_reader, socket_writer, peer_identity_);
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
...@@ -252,6 +227,16 @@ void zmq::session_t::process_term () ...@@ -252,6 +227,16 @@ void zmq::session_t::process_term ()
finalise (); finalise ();
} }
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
}
void zmq::session_t::unregister_session (const blob_t &name_)
{
socket->unregister_session (name_);
}
void zmq::session_t::attached (const blob_t &peer_identity_) void zmq::session_t::attached (const blob_t &peer_identity_)
{ {
} }
......
...@@ -72,6 +72,10 @@ namespace zmq ...@@ -72,6 +72,10 @@ namespace zmq
virtual void attached (const blob_t &peer_identity_); virtual void attached (const blob_t &peer_identity_);
virtual void detached (); virtual void detached ();
// Allows derives session types to (un)register session names.
bool register_session (const blob_t &name_, class session_t *session_);
void unregister_session (const blob_t &name_);
~session_t (); ~session_t ();
// Remove any half processed messages. Flush unflushed messages. // Remove any half processed messages. Flush unflushed messages.
...@@ -85,7 +89,6 @@ namespace zmq ...@@ -85,7 +89,6 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug ();
void process_attach (struct i_engine *engine_, void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void process_term (); void process_term ();
...@@ -110,10 +113,6 @@ namespace zmq ...@@ -110,10 +113,6 @@ namespace zmq
// The protocol I/O engine connected to the session. // The protocol I/O engine connected to the session.
struct i_engine *engine; struct i_engine *engine;
// Identity of the peer (say the component on the other side
// of TCP connection).
blob_t peer_identity;
// The socket the session belongs to. // The socket the session belongs to.
class socket_base_t *socket; class socket_base_t *socket;
......
...@@ -564,29 +564,29 @@ bool zmq::socket_base_t::has_out () ...@@ -564,29 +564,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out (); return xhas_out ();
} }
bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, bool zmq::socket_base_t::register_session (const blob_t &name_,
session_t *session_) session_t *session_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
bool registered = sessions.insert ( bool registered = sessions.insert (
std::make_pair (peer_identity_, session_)).second; std::make_pair (name_, session_)).second;
sessions_sync.unlock (); sessions_sync.unlock ();
return registered; return registered;
} }
void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) void zmq::socket_base_t::unregister_session (const blob_t &name_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
sessions_t::iterator it = sessions.find (peer_identity_); sessions_t::iterator it = sessions.find (name_);
zmq_assert (it != sessions.end ()); zmq_assert (it != sessions.end ());
sessions.erase (it); sessions.erase (it);
sessions_sync.unlock (); sessions_sync.unlock ();
} }
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
sessions_t::iterator it = sessions.find (peer_identity_); sessions_t::iterator it = sessions.find (name_);
if (it == sessions.end ()) { if (it == sessions.end ()) {
sessions_sync.unlock (); sessions_sync.unlock ();
return NULL; return NULL;
......
...@@ -71,10 +71,9 @@ namespace zmq ...@@ -71,10 +71,9 @@ namespace zmq
bool has_out (); bool has_out ();
// Registry of named sessions. // Registry of named sessions.
bool register_session (const blob_t &peer_identity_, bool register_session (const blob_t &name_, class session_t *session_);
class session_t *session_); void unregister_session (const blob_t &name_);
void unregister_session (const blob_t &peer_identity_); class session_t *find_session (const blob_t &name_);
class session_t *find_session (const blob_t &peer_identity_);
// i_reader_events interface implementation. // i_reader_events interface implementation.
void activated (class reader_t *pipe_); void activated (class reader_t *pipe_);
......
...@@ -180,10 +180,8 @@ void zmq::zmq_init_t::finalise_initialisation () ...@@ -180,10 +180,8 @@ void zmq::zmq_init_t::finalise_initialisation ()
} }
// There's no such named session. We have to create one. // There's no such named session. We have to create one.
// TODO: session = new (std::nothrow) named_session_t (io_thread, socket,
zmq_assert (false); options, peer_identity);
// session = new (std::nothrow) named_session_t (io_thread, socket,
// options, peer_identity);
zmq_assert (session); zmq_assert (session);
launch_sibling (session); launch_sibling (session);
engine->unplug (); engine->unplug ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment