Commit 923eacd2 authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part VI., session 'name' renamed to 'peer_identity'

parent 2e78e485
...@@ -42,23 +42,23 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, ...@@ -42,23 +42,23 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
} }
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *name_) : const options_t &options_, unsigned char peer_identity_size_,
unsigned char *peer_identity_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
in_pipe (NULL), in_pipe (NULL),
active (true), active (true),
out_pipe (NULL), out_pipe (NULL),
engine (NULL), engine (NULL),
ordinal (0),
options (options_) options (options_)
{ {
if (name_) { if (peer_identity_size_) {
type = named; type = named;
name = name_; peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
ordinal = 0;
} }
else { else {
type = transient; type = transient;
// TODO: Generate unique name here. // TODO: Generate unique name here.
ordinal = 0;
} }
} }
...@@ -169,8 +169,8 @@ void zmq::session_t::revive (reader_t *pipe_) ...@@ -169,8 +169,8 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
{ {
// Register the session with the socket. // Register the session with the socket.
if (!name.empty ()) { if (!peer_identity.empty ()) {
bool ok = owner->register_session (name.c_str (), this); bool ok = owner->register_session (peer_identity.c_str (), this);
// There's already a session with the specified identity. // There's already a session with the specified identity.
// We should syslog it and drop the session. TODO // We should syslog it and drop the session. TODO
...@@ -213,7 +213,7 @@ void zmq::session_t::process_unplug () ...@@ -213,7 +213,7 @@ void zmq::session_t::process_unplug ()
if (type == unnamed) if (type == unnamed)
owner->unregister_session (ordinal); owner->unregister_session (ordinal);
else if (type == named) else if (type == named)
owner->unregister_session (name.c_str ()); owner->unregister_session (peer_identity.c_str ());
// Ask associated pipes to terminate. // Ask associated pipes to terminate.
if (in_pipe) { if (in_pipe) {
......
...@@ -41,7 +41,8 @@ namespace zmq ...@@ -41,7 +41,8 @@ namespace zmq
// Creates named session. If name is NULL, transient session with // Creates named session. If name is NULL, transient session with
// auto-generated name is created. // auto-generated name is created.
session_t (object_t *parent_, socket_base_t *owner_, session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *name_); const options_t &options_, unsigned char peer_identity_size_,
unsigned char *peer_identity_);
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
...@@ -86,12 +87,15 @@ namespace zmq ...@@ -86,12 +87,15 @@ namespace zmq
unnamed unnamed
} type; } type;
// Ordinal of the session (if any). // Session is identified by ordinal in the case when it was created
// before connection to the peer was established and thus we are
// unaware of peer's identity.
uint64_t ordinal; uint64_t ordinal;
// The name of the session. One that is used to register it with // Identity of the peer. If the peer is anonymous, unique name is
// socket-level repository of sessions. // generated instead. Peer identity (or the generated name) is used
std::string name; // register the session with socket-level repository of sessions.
std::string peer_identity;
// Inherited socket options. // Inherited socket options.
options_t options; options_t options;
......
...@@ -168,7 +168,8 @@ void zmq::zmq_init_t::finalise () ...@@ -168,7 +168,8 @@ void zmq::zmq_init_t::finalise ()
if (!session) { if (!session) {
session = new (std::nothrow) session_t ( session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options, choose_io_thread (options.affinity), owner, options,
peer_identity.c_str ()); (unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.c_str ());
zmq_assert (session); zmq_assert (session);
send_plug (session); send_plug (session);
send_own (owner, session); send_own (owner, session);
...@@ -182,7 +183,7 @@ void zmq::zmq_init_t::finalise () ...@@ -182,7 +183,7 @@ void zmq::zmq_init_t::finalise ()
// transient session. // transient session.
else { else {
session = new (std::nothrow) session_t ( session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options, NULL); choose_io_thread (options.affinity), owner, options, 0, NULL);
zmq_assert (session); zmq_assert (session);
send_plug (session); send_plug (session);
send_own (owner, session); send_own (owner, session);
...@@ -191,7 +192,7 @@ void zmq::zmq_init_t::finalise () ...@@ -191,7 +192,7 @@ void zmq::zmq_init_t::finalise ()
session->inc_seqnum (); session->inc_seqnum ();
} }
// No need to increment seqnum as it was laready incremented above. // No need to increment seqnum as it was already incremented above.
send_attach (session, engine, (unsigned char) peer_identity.size (), send_attach (session, engine, (unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data (), false); (unsigned char*) peer_identity.data (), false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment