Commit 2f854472 authored by Ian Barber's avatar Ian Barber

Merge pull request #839 from hintjens/master

Cleaned up option to force identity on outgoing connection
parents 5f07d103 50bd28c0
...@@ -293,7 +293,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -293,7 +293,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_IPC_FILTER_PID 58 #define ZMQ_IPC_FILTER_PID 58
#define ZMQ_IPC_FILTER_UID 59 #define ZMQ_IPC_FILTER_UID 59
#define ZMQ_IPC_FILTER_GID 60 #define ZMQ_IPC_FILTER_GID 60
#define ZMQ_NEXT_CONNECT_PEER_ID 61 #define ZMQ_CONNECT_RID 61
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
#define ZMQ_SRCFD 2 #define ZMQ_SRCFD 2
......
...@@ -31,7 +31,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -31,7 +31,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
next_peer_id (generate_random ()), next_rid (generate_random ()),
mandatory (false), mandatory (false),
// raw_sock functionality in ROUTER is deprecated // raw_sock functionality in ROUTER is deprecated
raw_sock (false), raw_sock (false),
...@@ -88,9 +88,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -88,9 +88,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
int value = is_int? *((int *) optval_): 0; int value = is_int? *((int *) optval_): 0;
switch (option_) { switch (option_) {
case ZMQ_NEXT_CONNECT_PEER_ID: case ZMQ_CONNECT_RID:
if(optval_ && optvallen_) { if (optval_ && optvallen_) {
next_identity.assign((char*)optval_,optvallen_); connect_rid.assign ((char *) optval_, optvallen_);
return 0; return 0;
} }
break; break;
...@@ -387,33 +387,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -387,33 +387,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
msg_t msg; msg_t msg;
blob_t identity; blob_t identity;
bool ok; bool ok;
bool next_identity_used = false; bool connect_rid_used = false;
if (next_identity.length()) { if (connect_rid.length()) {
identity = blob_t((unsigned char*) next_identity.c_str(), identity = blob_t ((unsigned char*) connect_rid.c_str (),
next_identity.length()); connect_rid.length());
next_identity.clear(); connect_rid.clear ();
next_identity_used = true; connect_rid_used = true;
} }
else if (options.raw_sock) { // Always assign identity for raw-socket else
if (options.raw_sock) { // Always assign identity for raw-socket
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++); put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
} }
if (!options.raw_sock){ // pick up handshake cases and also case where next identity is set if (!options.raw_sock) {
// Pick up handshake cases and also case where next identity is set
msg.init (); msg.init ();
ok = pipe_->read (&msg); ok = pipe_->read (&msg);
if (!ok) if (!ok)
return false; return false;
if (next_identity_used){ // we read but do not use identity from peer
if (connect_rid_used) // we read but do not use identity from peer
msg.close(); msg.close();
} else
else if (msg.size () == 0) { if (msg.size () == 0) {
// Fall back on the auto-generation // Fall back on the auto-generation
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++); put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
msg.close (); msg.close ();
} }
...@@ -432,7 +435,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -432,7 +435,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
// existing pipe so we can terminate it asynchronously. // existing pipe so we can terminate it asynchronously.
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++); put_uint32 (buf + 1, next_rid++);
blob_t new_identity = blob_t (buf, sizeof buf); blob_t new_identity = blob_t (buf, sizeof buf);
it->second.pipe->set_identity (new_identity); it->second.pipe->set_identity (new_identity);
......
...@@ -104,9 +104,9 @@ namespace zmq ...@@ -104,9 +104,9 @@ namespace zmq
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
// Peer ID are generated. It's a simple increment and wrap-over // Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already). // algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id; uint32_t next_rid;
// If true, report EAGAIN to the caller instead of silently dropping // If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer. // the message targeting an unknown peer.
......
...@@ -164,8 +164,10 @@ namespace zmq ...@@ -164,8 +164,10 @@ namespace zmq
// Monitor socket cleanup // Monitor socket cleanup
void stop_monitor (); void stop_monitor ();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string next_identity; std::string connect_rid;
private: private:
// Creates new endpoint ID and adds the endpoint to the map. // Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe); void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
......
...@@ -30,7 +30,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -30,7 +30,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
identity_sent (false), identity_sent (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
next_peer_id (generate_random ()) next_rid (generate_random ())
{ {
options.type = ZMQ_STREAM; options.type = ZMQ_STREAM;
options.raw_sock = true; options.raw_sock = true;
...@@ -163,13 +163,14 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -163,13 +163,14 @@ int zmq::stream_t::xsend (msg_t *msg_)
return 0; return 0;
} }
int zmq::stream_t::xsetsockopt (int option_, const void *optval_, int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
switch (option_) { switch (option_) {
case ZMQ_NEXT_CONNECT_PEER_ID: case ZMQ_CONNECT_RID:
if(optval_ && optvallen_) { if (optval_ && optvallen_) {
next_identity.assign((char*)optval_,optvallen_); connect_rid.assign ((char*) optval_, optvallen_);
return 0; return 0;
} }
break; break;
...@@ -179,6 +180,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, ...@@ -179,6 +180,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
int zmq::stream_t::xrecv (msg_t *msg_) int zmq::stream_t::xrecv (msg_t *msg_)
{ {
if (prefetched) { if (prefetched) {
...@@ -260,13 +262,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -260,13 +262,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
unsigned char buffer [5]; unsigned char buffer [5];
buffer [0] = 0; buffer [0] = 0;
blob_t identity; blob_t identity;
if (next_identity.length()) { if (connect_rid.length ()) {
identity = blob_t((unsigned char*) next_identity.c_str(), identity = blob_t ((unsigned char*) connect_rid.c_str(),
next_identity.length()); connect_rid.length ());
next_identity.clear(); connect_rid.clear ();
} }
else { else {
put_uint32 (buffer + 1, next_peer_id++); put_uint32 (buffer + 1, next_rid++);
blob_t identity = blob_t (buffer, sizeof buffer); blob_t identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ()); memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size (); options.identity_size = identity.size ();
......
...@@ -84,9 +84,9 @@ namespace zmq ...@@ -84,9 +84,9 @@ namespace zmq
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
// Peer ID are generated. It's a simple increment and wrap-over // Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already). // algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id; uint32_t next_rid;
stream_t (const stream_t&); stream_t (const stream_t&);
const stream_t &operator = (const stream_t&); const stream_t &operator = (const stream_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment