Commit f037290d authored by Martin Hurton's avatar Martin Hurton

router: reimplement peer identification

The new implementation allows one to send messages through
a router socket to a peer even before receiving
messages from this peer.

Fixes issue #304
parent 476c9b97
......@@ -29,7 +29,8 @@
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (0),
prefetched (false),
identity_sent (false),
more_in (false),
current_out (NULL),
more_out (false),
......@@ -47,12 +48,15 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.send_identity = true;
options.recv_identity = true;
prefetched_id.init ();
prefetched_msg.init ();
}
zmq::router_t::~router_t ()
{
zmq_assert (anonymous_pipes.empty ());;
zmq_assert (outpipes.empty ());
prefetched_id.close ();
prefetched_msg.close ();
}
......@@ -60,22 +64,11 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
zmq_assert (pipe_);
// Generate a new unique peer identity.
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id);
blob_t identity (buf, 5);
++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
pipe_->set_identity (identity);
bool identity_ok = identify_peer (pipe_);
if (identity_ok)
fq.attach (pipe_);
else
anonymous_pipes.insert (pipe_);
}
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
......@@ -85,34 +78,39 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL;
return -1;
}
fail_unroutable = *((const int*) optval_);
fail_unroutable = *static_cast <const int*> (optval_);
return 0;
}
void zmq::router_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
if (it->second.pipe == pipe_) {
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it != anonymous_pipes.end ())
anonymous_pipes.erase (it);
else {
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.terminated (pipe_);
if (pipe_ == current_out)
current_out = NULL;
return;
}
}
// We should never get here
zmq_assert (false);
}
void zmq::router_t::xread_activated (pipe_t *pipe_)
{
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it == anonymous_pipes.end ())
fq.activated (pipe_);
else {
bool identity_ok = identify_peer (pipe_);
if (identity_ok)
anonymous_pipes.erase (it);
}
}
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
......@@ -198,85 +196,51 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
int zmq::router_t::xrecv (msg_t *msg_, int flags_)
{
// if there is a prefetched identity, return it.
if (prefetched == 2)
{
int rc = msg_->init_size (prefetched_id.size ());
if (prefetched) {
if (!identity_sent) {
int rc = msg_->move (prefetched_id);
errno_assert (rc == 0);
memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
msg_->set_flags (msg_t::more);
prefetched = 1;
return 0;
identity_sent = true;
}
// If there is a prefetched message, return it.
if (prefetched == 1) {
else {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
prefetched = false;
}
more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = 0;
return 0;
}
pipe_t *pipe = NULL;
while (true) {
// Get next message part.
int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
if (rc != 0) {
errno = EAGAIN;
return -1;
// If identity is received, change the key assigned to the pipe.
if (likely (!(msg_->flags () & msg_t::identity)))
break;
zmq_assert (!more_in);
// Empty identity means we can preserve the auto-generated identity
if (msg_->size ()) {
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end ()) {
// Find the pipe and change its identity
bool changed = false;
it = outpipes.begin ();
while (it != outpipes.end ()) {
if (it->second.pipe == pipe) {
pipe->set_identity (identity);
outpipes.erase (it);
outpipe_t outpipe = {pipe, true};
if (!outpipes.insert (
outpipes_t::value_type (identity, outpipe)).second)
zmq_assert (false);
changed = true;
break;
}
++it;
}
zmq_assert (changed);
}
}
}
// Identity is not expected
assert ((msg_->flags () & msg_t::identity) == 0);
assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
if (more_in)
more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
// We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
prefetched = 1;
rc = msg_->close ();
else {
// We are at the beginning of a message.
// Keep the message part we have in the prefetch buffer
// and return the ID of the peer instead.
rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
prefetched = true;
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
identity_sent = true;
}
return 0;
}
......@@ -298,25 +262,27 @@ bool zmq::router_t::xhas_in ()
return true;
// We may already have a message pre-fetched.
if (prefetched > 0)
if (prefetched)
return true;
// Try to read the next message to the pre-fetch buffer. If anything,
// it will be identity of the peer sending the message.
msg_t id;
id.init ();
int rc = router_t::xrecv (&id, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN) {
id.close ();
// Try to read the next message.
// The message, if read, is kept in the pre-fetch buffer.
pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, ZMQ_DONTWAIT, &pipe);
if (rc != 0)
return false;
}
zmq_assert (rc == 0);
// We have first part of the message prefetched now. We will store the
// prefetched identity as well.
prefetched_id.assign ((unsigned char*) id.data (), id.size ());
id.close ();
prefetched = 2;
// Identity is not expected
assert ((prefetched_msg.flags () & msg_t::identity) == 0);
blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ());
errno_assert (rc == 0);
memcpy (prefetched_id.data (), identity.data (), identity.size ());
prefetched_id.set_flags (msg_t::more);
prefetched = true;
identity_sent = false;
return true;
}
......@@ -329,6 +295,43 @@ bool zmq::router_t::xhas_out ()
return true;
}
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
msg_t msg;
blob_t identity;
msg.init ();
bool ok = pipe_->read (&msg);
if (!ok)
return false;
if (msg.size () == 0) {
// Fall back on the auto-generation
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
msg.close ();
}
else {
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (identity);
msg.close ();
// Ignore peers with duplicate ID.
if (it != outpipes.end ())
return false;
}
pipe_->set_identity (identity);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
return true;
}
zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
......
......@@ -65,15 +65,21 @@ namespace zmq
private:
// Receive peer id and update lookup map
bool identify_peer (pipe_t *pipe_);
// Fair queueing object for inbound pipes.
fq_t fq;
// This value is either 0 (nothing is prefetched), 1 (only message body
// is prefetched) or 2 (both identity and message body are prefetched).
int prefetched;
// True iff there is a message held in the pre-fetch buffer.
bool prefetched;
// If true, the receiver got the message part with
// the peer's identity.
bool identity_sent;
// Holds the prefetched identity.
blob_t prefetched_id;
msg_t prefetched_id;
// Holds the prefetched message.
msg_t prefetched_msg;
......@@ -87,6 +93,9 @@ namespace zmq
bool active;
};
// We keep a set of pipes that have not been identified yet.
std::set <pipe_t*> anonymous_pipes;
// Outbound pipes indexed by the peer IDs.
typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment