Commit d82389a7 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #834 from Prarrot/master

Updated STREAM and ROUTER sockets to allow for pre-naming of outbound connections
parents a66c47f9 f13512a9
......@@ -293,7 +293,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_IPC_FILTER_PID 58
#define ZMQ_IPC_FILTER_UID 59
#define ZMQ_IPC_FILTER_GID 60
#define ZMQ_NEXT_IDENTITY 61
/* Message options */
#define ZMQ_MORE 1
#define ZMQ_SRCFD 2
......
......@@ -88,6 +88,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_NEXT_IDENTITY:
if(optval_ && optvallen_) {
next_identity.assign((char*)optval_,optvallen_);
return 0;
}
break;
case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) {
raw_sock = (value != 0);
......@@ -382,8 +388,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
blob_t identity;
bool ok;
if (options.raw_sock) { // Always assign identity for raw-socket
unsigned char buf [5];
if (next_identity.length()) {
identity = blob_t((unsigned char*) next_identity.c_str(),
next_identity.length());
next_identity.clear();
}
else if (options.raw_sock) { // Always assign identity for raw-socket
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
......
......@@ -163,7 +163,25 @@ int zmq::stream_t::xsend (msg_t *msg_)
return 0;
}
int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_NEXT_IDENTITY:
if(optval_ && optvallen_) {
next_identity.assign((char*)optval_,optvallen_);
return 0;
}
break;
default:
break;
}
errno = EINVAL;
return -1;
}
int zmq::stream_t::xrecv (msg_t *msg_)
{
if (prefetched) {
......@@ -244,12 +262,18 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
// Always assign identity for raw-socket
unsigned char buffer [5];
buffer [0] = 0;
put_uint32 (buffer + 1, next_peer_id++);
blob_t identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size ();
blob_t identity;
if (next_identity.length()) {
identity = blob_t((unsigned char*) next_identity.c_str(),
next_identity.length());
next_identity.clear();
}
else {
put_uint32 (buffer + 1, next_peer_id++);
blob_t identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size ();
}
pipe_->set_identity (identity);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
......
......@@ -47,7 +47,7 @@ namespace zmq
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
private:
// Generate peer's id and update lookup map
void identify_peer (pipe_t *pipe_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment