Commit 5c6e2788 authored by Ian Barber's avatar Ian Barber

Merge pull request #301 from hurtonm/router_rework_peer_identification

router: reimplement peer identification
parents 476c9b97 f037290d
...@@ -29,7 +29,8 @@ ...@@ -29,7 +29,8 @@
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
prefetched (0), prefetched (false),
identity_sent (false),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
...@@ -47,12 +48,15 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -47,12 +48,15 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.send_identity = true; options.send_identity = true;
options.recv_identity = true; options.recv_identity = true;
prefetched_id.init ();
prefetched_msg.init (); prefetched_msg.init ();
} }
zmq::router_t::~router_t () zmq::router_t::~router_t ()
{ {
zmq_assert (anonymous_pipes.empty ());;
zmq_assert (outpipes.empty ()); zmq_assert (outpipes.empty ());
prefetched_id.close ();
prefetched_msg.close (); prefetched_msg.close ();
} }
...@@ -60,22 +64,11 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) ...@@ -60,22 +64,11 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
// Generate a new unique peer identity. bool identity_ok = identify_peer (pipe_);
unsigned char buf [5]; if (identity_ok)
buf [0] = 0; fq.attach (pipe_);
put_uint32 (buf + 1, next_peer_id); else
blob_t identity (buf, 5); anonymous_pipes.insert (pipe_);
++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
pipe_->set_identity (identity);
fq.attach (pipe_);
} }
int zmq::router_t::xsetsockopt (int option_, const void *optval_, int zmq::router_t::xsetsockopt (int option_, const void *optval_,
...@@ -85,34 +78,39 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -85,34 +78,39 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
fail_unroutable = *((const int*) optval_); fail_unroutable = *static_cast <const int*> (optval_);
return 0; return 0;
} }
void zmq::router_t::xterminated (pipe_t *pipe_) void zmq::router_t::xterminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it != anonymous_pipes.end ())
for (outpipes_t::iterator it = outpipes.begin (); anonymous_pipes.erase (it);
it != outpipes.end (); ++it) { else {
if (it->second.pipe == pipe_) { outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
outpipes.erase (it); zmq_assert (it != outpipes.end ());
if (pipe_ == current_out) outpipes.erase (it);
current_out = NULL; fq.terminated (pipe_);
return; if (pipe_ == current_out)
} current_out = NULL;
} }
// We should never get here
zmq_assert (false);
} }
void zmq::router_t::xread_activated (pipe_t *pipe_) void zmq::router_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it == anonymous_pipes.end ())
fq.activated (pipe_);
else {
bool identity_ok = identify_peer (pipe_);
if (identity_ok)
anonymous_pipes.erase (it);
}
} }
void zmq::router_t::xwrite_activated (pipe_t *pipe_) void zmq::router_t::xwrite_activated (pipe_t *pipe_)
...@@ -198,85 +196,51 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -198,85 +196,51 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
int zmq::router_t::xrecv (msg_t *msg_, int flags_) int zmq::router_t::xrecv (msg_t *msg_, int flags_)
{ {
// if there is a prefetched identity, return it. if (prefetched) {
if (prefetched == 2) if (!identity_sent) {
{ int rc = msg_->move (prefetched_id);
int rc = msg_->init_size (prefetched_id.size ()); errno_assert (rc == 0);
errno_assert (rc == 0); identity_sent = true;
memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ()); }
msg_->set_flags (msg_t::more); else {
prefetched = 1; int rc = msg_->move (prefetched_msg);
return 0; errno_assert (rc == 0);
} prefetched = false;
}
// If there is a prefetched message, return it.
if (prefetched == 1) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = 0;
return 0; return 0;
} }
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
while (true) { int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0) {
// Get next message part. errno = EAGAIN;
int rc = fq.recvpipe (msg_, flags_, &pipe); return -1;
if (rc != 0)
return -1;
// If identity is received, change the key assigned to the pipe.
if (likely (!(msg_->flags () & msg_t::identity)))
break;
zmq_assert (!more_in);
// Empty identity means we can preserve the auto-generated identity
if (msg_->size ()) {
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end ()) {
// Find the pipe and change its identity
bool changed = false;
it = outpipes.begin ();
while (it != outpipes.end ()) {
if (it->second.pipe == pipe) {
pipe->set_identity (identity);
outpipes.erase (it);
outpipe_t outpipe = {pipe, true};
if (!outpipes.insert (
outpipes_t::value_type (identity, outpipe)).second)
zmq_assert (false);
changed = true;
break;
}
++it;
}
zmq_assert (changed);
}
}
} }
// Identity is not expected
assert ((msg_->flags () & msg_t::identity) == 0);
assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
if (more_in) { if (more_in)
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
return 0; else {
// We are at the beginning of a message.
// Keep the message part we have in the prefetch buffer
// and return the ID of the peer instead.
rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
prefetched = true;
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
identity_sent = true;
} }
// We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
prefetched = 1;
rc = msg_->close ();
errno_assert (rc == 0);
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
return 0; return 0;
} }
...@@ -292,31 +256,33 @@ int zmq::router_t::rollback (void) ...@@ -292,31 +256,33 @@ int zmq::router_t::rollback (void)
bool zmq::router_t::xhas_in () bool zmq::router_t::xhas_in ()
{ {
// If we are in the middle of reading the messages, there are // If we are in the middle of reading the messages, there are
// definitely more parts available. // definitely more parts available.
if (more_in) if (more_in)
return true; return true;
// We may already have a message pre-fetched. // We may already have a message pre-fetched.
if (prefetched > 0) if (prefetched)
return true; return true;
// Try to read the next message to the pre-fetch buffer. If anything, // Try to read the next message.
// it will be identity of the peer sending the message. // The message, if read, is kept in the pre-fetch buffer.
msg_t id; pipe_t *pipe = NULL;
id.init (); int rc = fq.recvpipe (&prefetched_msg, ZMQ_DONTWAIT, &pipe);
int rc = router_t::xrecv (&id, ZMQ_DONTWAIT); if (rc != 0)
if (rc != 0 && errno == EAGAIN) {
id.close ();
return false; return false;
}
zmq_assert (rc == 0);
// We have first part of the message prefetched now. We will store the // Identity is not expected
// prefetched identity as well. assert ((prefetched_msg.flags () & msg_t::identity) == 0);
prefetched_id.assign ((unsigned char*) id.data (), id.size ());
id.close (); blob_t identity = pipe->get_identity ();
prefetched = 2; rc = prefetched_id.init_size (identity.size ());
errno_assert (rc == 0);
memcpy (prefetched_id.data (), identity.data (), identity.size ());
prefetched_id.set_flags (msg_t::more);
prefetched = true;
identity_sent = false;
return true; return true;
} }
...@@ -329,6 +295,43 @@ bool zmq::router_t::xhas_out () ...@@ -329,6 +295,43 @@ bool zmq::router_t::xhas_out ()
return true; return true;
} }
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
msg_t msg;
blob_t identity;
msg.init ();
bool ok = pipe_->read (&msg);
if (!ok)
return false;
if (msg.size () == 0) {
// Fall back on the auto-generation
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
msg.close ();
}
else {
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (identity);
msg.close ();
// Ignore peers with duplicate ID.
if (it != outpipes.end ())
return false;
}
pipe_->set_identity (identity);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
return true;
}
zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const address_t *addr_) : const address_t *addr_) :
......
...@@ -65,15 +65,21 @@ namespace zmq ...@@ -65,15 +65,21 @@ namespace zmq
private: private:
// Receive peer id and update lookup map
bool identify_peer (pipe_t *pipe_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
// This value is either 0 (nothing is prefetched), 1 (only message body // True iff there is a message held in the pre-fetch buffer.
// is prefetched) or 2 (both identity and message body are prefetched). bool prefetched;
int prefetched;
// If true, the receiver got the message part with
// the peer's identity.
bool identity_sent;
// Holds the prefetched identity. // Holds the prefetched identity.
blob_t prefetched_id; msg_t prefetched_id;
// Holds the prefetched message. // Holds the prefetched message.
msg_t prefetched_msg; msg_t prefetched_msg;
...@@ -87,6 +93,9 @@ namespace zmq ...@@ -87,6 +93,9 @@ namespace zmq
bool active; bool active;
}; };
// We keep a set of pipes that have not been identified yet.
std::set <pipe_t*> anonymous_pipes;
// Outbound pipes indexed by the peer IDs. // Outbound pipes indexed by the peer IDs.
typedef std::map <blob_t, outpipe_t> outpipes_t; typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes; outpipes_t outpipes;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment