Commit cdc2efe9 authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part VII., identity-related algorithms rewritten

parent 923eacd2
......@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
{
type = unnamed;
{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
......@@ -52,13 +50,20 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
ordinal (0),
options (options_)
{
if (!peer_identity_size_)
// If peer identity is not supplied, leave it empty.
if (peer_identity_size_) {
type = named;
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
}
else {
type = transient;
// TODO: Generate unique name here.
if (!owner->register_session (peer_identity_size_, peer_identity_,
this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
// session.
zmq_assert (false);
}
}
}
......@@ -104,7 +109,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
if (type == transient)
if (!ordinal && peer_identity.empty ())
term ();
}
......@@ -120,7 +125,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
......@@ -168,13 +172,62 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
// Register the session with the socket.
}
void zmq::session_t::process_unplug ()
{
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
else if (!peer_identity.empty ())
owner->unregister_session ((unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data ());
// Ask associated pipes to terminate.
if (in_pipe) {
in_pipe->term ();
in_pipe = NULL;
}
if (out_pipe) {
out_pipe->term ();
out_pipe = NULL;
}
if (engine) {
engine->unplug ();
delete engine;
engine = NULL;
}
}
void zmq::session_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
if (!peer_identity.empty ()) {
bool ok = owner->register_session (peer_identity.c_str (), this);
// There's already a session with the specified identity.
// We should syslog it and drop the session. TODO
zmq_assert (ok);
// If we already know the peer name do nothing, just check whether
// it haven't changed.
zmq_assert (peer_identity.size () == peer_identity_size_);
zmq_assert (memcmp (peer_identity.data (), peer_identity_,
peer_identity_size_) == 0);
}
else if (peer_identity_size_) {
// Remember the peer identity.
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
// If the session is not registered with the ordinal, let's register
// it using the peer name.
if (!ordinal) {
if (!owner->register_session (peer_identity_size_, peer_identity_,
this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
// session.
zmq_assert (false);
}
}
}
// If session is created by 'connect' function, it has the pipes set
......@@ -204,37 +257,8 @@ void zmq::session_t::process_plug ()
send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL);
}
}
void zmq::session_t::process_unplug ()
{
// Unregister the session from the socket. There's nothing to do here
// for transient sessions.
if (type == unnamed)
owner->unregister_session (ordinal);
else if (type == named)
owner->unregister_session (peer_identity.c_str ());
// Ask associated pipes to terminate.
if (in_pipe) {
in_pipe->term ();
in_pipe = NULL;
}
if (out_pipe) {
out_pipe->term ();
out_pipe = NULL;
}
if (engine) {
engine->unplug ();
delete engine;
engine = NULL;
}
}
void zmq::session_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
// Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
......
......@@ -81,20 +81,12 @@ namespace zmq
struct i_engine *engine;
enum {
transient,
named,
unnamed
} type;
// Session is identified by ordinal in the case when it was created
// before connection to the peer was established and thus we are
// unaware of peer's identity.
uint64_t ordinal;
// Identity of the peer. If the peer is anonymous, unique name is
// generated instead. Peer identity (or the generated name) is used
// register the session with socket-level repository of sessions.
// Identity of the peer.
std::string peer_identity;
// Inherited socket options.
......
......@@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
send_attach (session, pgm_sender);
send_attach (session, pgm_sender, 0, NULL);
}
else if (options.requires_in) {
......@@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
send_attach (session, pgm_receiver);
send_attach (session, pgm_receiver, 0, NULL);
}
else
zmq_assert (false);
......@@ -454,30 +454,33 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_, session_t *session_)
{
sessions_sync.lock ();
bool registered =
named_sessions.insert (std::make_pair (name_, session_)).second;
bool registered = named_sessions.insert (std::make_pair (std::string (
(char*) peer_identity_, peer_identity_size_), session_)).second;
sessions_sync.unlock ();
return registered;
}
void zmq::socket_base_t::unregister_session (const char *name_)
void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_)
{
sessions_sync.lock ();
named_sessions_t::iterator it = named_sessions.find (name_);
named_sessions_t::iterator it = named_sessions.find (std::string (
(char*) peer_identity_, peer_identity_size_));
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
zmq::session_t *zmq::socket_base_t::find_session (
unsigned char peer_identity_size_, unsigned char *peer_identity_)
{
sessions_sync.lock ();
named_sessions_t::iterator it = named_sessions.find (name_);
named_sessions_t::iterator it = named_sessions.find (std::string (
(char*) peer_identity_, peer_identity_size_));
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
......
......@@ -78,9 +78,12 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
bool register_session (const char *name_, class session_t *session_);
void unregister_session (const char *name_);
class session_t *find_session (const char *name_);
bool register_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_, class session_t *session_);
void unregister_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_);
class session_t *find_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
......
......@@ -164,7 +164,9 @@ void zmq::zmq_init_t::finalise ()
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
session = owner->find_session (peer_identity.c_str ());
session = owner->find_session (
(unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data ());
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment