Commit 3825f3b3 authored by Mark Barbisan's avatar Mark Barbisan

Add support to the ROUTER socket to reassign identities upon name collision.

parent 2a03d541
...@@ -292,6 +292,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -292,6 +292,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_REQ_RELAXED 53 #define ZMQ_REQ_RELAXED 53
#define ZMQ_CONFLATE 54 #define ZMQ_CONFLATE 54
#define ZMQ_ZAP_DOMAIN 55 #define ZMQ_ZAP_DOMAIN 55
#define ZMQ_ROUTER_REASSIGN_IDENTITES 56
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -35,7 +35,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -35,7 +35,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
mandatory (false), mandatory (false),
// raw_sock functionality in ROUTER is deprecated // raw_sock functionality in ROUTER is deprecated
raw_sock (false), raw_sock (false),
probe_router (false) probe_router (false),
reassign_identities(false)
{ {
options.type = ZMQ_ROUTER; options.type = ZMQ_ROUTER;
options.recv_identity = true; options.recv_identity = true;
...@@ -111,6 +112,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -111,6 +112,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_ROUTER_REASSIGN_IDENTITES:
if (is_int && value >= 0) {
reassign_identities = (value != 0);
return 0;
}
break;
default: default:
break; break;
...@@ -394,10 +401,38 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -394,10 +401,38 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
msg.close (); msg.close ();
// Ignore peers with duplicate ID.
if (it != outpipes.end ()) if (it != outpipes.end ())
{
if (!reassign_identities) {
// Ignore peers with duplicate ID.
return false; return false;
} }
else
{
// We will allow the new connection to take over this
// identity. Temporarily assign a new identity to the
// existing pipe so we can terminate it asynchronously.
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
blob_t new_identity = blob_t (buf, sizeof buf);
it->second.pipe->set_identity (new_identity);
outpipe_t existing_outpipe =
{it->second.pipe, it->second.active};
ok = outpipes.insert (outpipes_t::value_type (
new_identity, existing_outpipe)).second;
zmq_assert (ok);
// Remove the existing identity entry to allow the new
// connection to take the identity.
outpipes.erase (it);
existing_outpipe.pipe->terminate (true);
}
}
}
} }
pipe_->set_identity (identity); pipe_->set_identity (identity);
......
...@@ -115,6 +115,11 @@ namespace zmq ...@@ -115,6 +115,11 @@ namespace zmq
// if true, send an empty message to every connected router peer // if true, send an empty message to every connected router peer
bool probe_router; bool probe_router;
// If true, the router will reassign an identity upon encountering a
// name collision. The new pipe will take the identity, the old pipe
// will be terminated.
bool reassign_identities;
router_t (const router_t&); router_t (const router_t&);
const router_t &operator = (const router_t&); const router_t &operator = (const router_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment