Commit 32ded2b4 authored by Martin Sustrik's avatar Martin Sustrik

Duplicate identities now checked with zmq_connect

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent b79d07b8
......@@ -28,12 +28,15 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
address (address_)
address (address_),
connected (false)
{
}
zmq::connect_session_t::~connect_session_t ()
{
if (connected && !peer_identity.empty ())
unregister_session (peer_identity);
}
void zmq::connect_session_t::process_plug ()
......@@ -107,8 +110,46 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false);
}
void zmq::connect_session_t::attached (const blob_t &peer_identity_)
bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
{
// If there was no previous connection...
if (!connected) {
// Peer has transient identity.
if (peer_identity_.empty () || peer_identity_ [0] == 0) {
connected = true;
return true;
}
// Peer has strong identity. Let's register it and check whether noone
// else is using the same identity.
if (!register_session (peer_identity_, this)) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
connected = true;
peer_identity = peer_identity_;
return true;
}
// New engine from listener can conflict with existing engine.
// Alternatively, new engine created by reconnection process can
// conflict with engine supplied by listener in the meantime.
if (has_engine ()) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
// If there have been a connection before, we have to check whether
// peer's identity haven't changed in the meantime.
if ((peer_identity_.empty () || peer_identity_ [0] == 0) &&
peer_identity.empty ())
return true;
if (peer_identity != peer_identity_) {
log ("CHID: peer have changed identity - disconnecting peer");
return false;
}
return true;
}
void zmq::connect_session_t::detached ()
......
......@@ -23,6 +23,7 @@
#include <string>
#include "blob.hpp"
#include "session.hpp"
namespace zmq
......@@ -43,7 +44,7 @@ namespace zmq
private:
// Handlers for events from session base class.
void attached (const blob_t &peer_identity_);
bool attached (const blob_t &peer_identity_);
void detached ();
// Start the connection process.
......@@ -56,6 +57,13 @@ namespace zmq
std::string protocol;
std::string address;
// If true, the session was already connected to the peer.
bool connected;
// Identity of the peer. If 'connected' is false, it has no meaning.
// Otherwise, if it's empty, the peer has transient identity.
blob_t peer_identity;
connect_session_t (const connect_session_t&);
const connect_session_t &operator = (const connect_session_t&);
};
......
......@@ -45,11 +45,17 @@ zmq::named_session_t::~named_session_t ()
unregister_session (peer_identity);
}
void zmq::named_session_t::attached (const blob_t &peer_identity_)
bool zmq::named_session_t::attached (const blob_t &peer_identity_)
{
// The owner should take care to not attach the session
// to an unrelated peer.
// Double check that identities match.
zmq_assert (peer_identity == peer_identity_);
// If the session already has an engine attached, destroy new one.
if (has_engine ()) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
return true;
}
void zmq::named_session_t::detached ()
......
......@@ -40,7 +40,7 @@ namespace zmq
~named_session_t ();
// Handlers for events from session base class.
void attached (const blob_t &peer_identity_);
bool attached (const blob_t &peer_identity_);
void detached ();
private:
......
......@@ -214,26 +214,25 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
{
// If some other object (e.g. init) notifies us that the connection failed
// we need to start the reconnection process.
if (!engine_) {
zmq_assert (!engine);
detached ();
return;
}
// If we are already terminating, we destroy the engine straight away.
// Note that we don't have to unplug it before deleting as it's not
// yet plugged to the session.
if (state == terminating) {
delete engine_;
if (engine_)
delete engine_;
return;
}
// If the session already has an engine attached, destroy new one.
// Note new engine is not plugged in yet, we don't have to unplug it.
if (engine) {
log ("DPID: duplicate peer identity - disconnecting peer");
// If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process.
if (!engine_) {
zmq_assert (!engine);
detached ();
return;
}
// Trigger the notfication event about the attachment.
if (!attached (peer_identity_)) {
delete engine_;
return;
}
......@@ -248,8 +247,8 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
&out_pipe);
create_pipe (socket, this, options.hwm, options.swap,
&socket_reader, &out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
......@@ -264,11 +263,9 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
// Plug in the engine.
zmq_assert (!engine);
engine = engine_;
engine->plug (io_thread, this);
// Trigger the notfication about the attachment.
attached (peer_identity_);
}
void zmq::session_t::detach ()
......@@ -330,6 +327,11 @@ void zmq::session_t::timer_event (int id_)
proceed_with_term ();
}
bool zmq::session_t::has_engine ()
{
return engine != NULL;
}
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
......
......@@ -69,12 +69,16 @@ namespace zmq
void terminate ();
// Two events for the derived session type. Attached is triggered
// when session is attached to a peer, detached is triggered at the
// beginning of the termination process when session is about to
// be detached from the peer.
virtual void attached (const blob_t &peer_identity_) = 0;
// when session is attached to a peer. The function can reject the new
// peer by returning false. Detached is triggered at the beginning of
// the termination process when session is about to be detached from
// the peer.
virtual bool attached (const blob_t &peer_identity_) = 0;
virtual void detached () = 0;
// Returns true if there is an engine attached to the session.
bool has_engine ();
// Allows derives session types to (un)register session names.
bool register_session (const blob_t &name_, class session_t *session_);
void unregister_session (const blob_t &name_);
......
......@@ -619,7 +619,9 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
session_t *session = it->second;
// Prepare the session for subsequent attach command.
session->inc_seqnum ();
// Note the connect sessions have NULL pointers registered here.
if (session)
session->inc_seqnum ();
sessions_sync.unlock ();
return session;
......
......@@ -30,8 +30,10 @@ zmq::transient_session_t::~transient_session_t ()
{
}
void zmq::transient_session_t::attached (const blob_t &peer_identity_)
bool zmq::transient_session_t::attached (const blob_t &peer_identity_)
{
// Transient session is always valid.
return true;
}
void zmq::transient_session_t::detached ()
......
......@@ -40,7 +40,7 @@ namespace zmq
private:
// Handlers for events from session base class.
void attached (const blob_t &peer_identity_);
bool attached (const blob_t &peer_identity_);
void detached ();
transient_session_t (const transient_session_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment