Commit aaeae8de authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #841 from Prarrot/master

ZMQ_CONNECT_RID tests and man
parents 2f854472 3fbc10eb
...@@ -622,6 +622,7 @@ set(tests ...@@ -622,6 +622,7 @@ set(tests
test_timeo test_timeo
test_many_sockets test_many_sockets
test_diffserv test_diffserv
test_connect_rid
) )
if(NOT WIN32) if(NOT WIN32)
list(APPEND tests list(APPEND tests
......
...@@ -67,6 +67,32 @@ Default value:: 100 ...@@ -67,6 +67,32 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports. Applicable socket types:: all, only for connection-oriented transports.
ZMQ_CONNECT_RID: Assign the next outbound connection id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected
via the zmq_connect() call, and immediately readies that connection for
data transfer with the named id. This option applies only to the first
subsequent call to zmq_connect(), calls thereafter use default connection
behavior.
Typical use is to set this socket option on each zmq_connect() attempt
to a new host. Each connection should be assigned a unique name. Duplicated
names will trigger default connection behavior.
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
allows for immediate sending to peers. Outbound id framing requirements
for ROUTER and STREAM sockets apply.
The peer id should be from 1 to 255 bytes long and MAY NOT start with
binary zero.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_ROUTER, ZMQ_STREAM
ZMQ_CONFLATE: Keep only last message ZMQ_CONFLATE: Keep only last message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set, a socket shall keep only one message in its inbound/outbound If set, a socket shall keep only one message in its inbound/outbound
......
...@@ -387,13 +387,15 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -387,13 +387,15 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
msg_t msg; msg_t msg;
blob_t identity; blob_t identity;
bool ok; bool ok;
bool connect_rid_used = false;
if (connect_rid.length()) { if (connect_rid.length()) {
identity = blob_t ((unsigned char*) connect_rid.c_str (), identity = blob_t ((unsigned char*) connect_rid.c_str (),
connect_rid.length()); connect_rid.length());
connect_rid.clear (); connect_rid.clear ();
connect_rid_used = true; outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
return false; // duplicate connection
}
} }
else else
if (options.raw_sock) { // Always assign identity for raw-socket if (options.raw_sock) { // Always assign identity for raw-socket
...@@ -402,6 +404,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -402,6 +404,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
put_uint32 (buf + 1, next_rid++); put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
} }
else
if (!options.raw_sock) { if (!options.raw_sock) {
// Pick up handshake cases and also case where next identity is set // Pick up handshake cases and also case where next identity is set
msg.init (); msg.init ();
...@@ -409,9 +412,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -409,9 +412,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
if (!ok) if (!ok)
return false; return false;
if (connect_rid_used) // we read but do not use identity from peer
msg.close();
else
if (msg.size () == 0) { if (msg.size () == 0) {
// Fall back on the auto-generation // Fall back on the auto-generation
unsigned char buf [5]; unsigned char buf [5];
......
...@@ -266,10 +266,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -266,10 +266,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
identity = blob_t ((unsigned char*) connect_rid.c_str(), identity = blob_t ((unsigned char*) connect_rid.c_str(),
connect_rid.length ()); connect_rid.length ());
connect_rid.clear (); connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ())
goto d;
} }
else { else {
put_uint32 (buffer + 1, next_rid++); d: put_uint32 (buffer + 1, next_rid++);
blob_t identity = blob_t (buffer, sizeof buffer); identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ()); memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size (); options.identity_size = identity.size ();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment