Commit a4843b65 authored by Martin Sustrik's avatar Martin Sustrik

Identities re-introduced

However, the "durable socket" behaviour wasn't re-added.
Identities are used solely for routing in REQ/REP pattern.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent d20ea25b
...@@ -50,6 +50,7 @@ namespace zmq ...@@ -50,6 +50,7 @@ namespace zmq
enum enum
{ {
more = 1, more = 1,
identity = 64,
shared = 128 shared = 128
}; };
......
...@@ -46,7 +46,9 @@ zmq::options_t::options_t () : ...@@ -46,7 +46,9 @@ zmq::options_t::options_t () :
ipv4only (1), ipv4only (1),
delay_on_close (true), delay_on_close (true),
delay_on_disconnect (true), delay_on_disconnect (true),
filter (false) filter (false),
send_identity (false),
recv_identity (false)
{ {
} }
......
...@@ -99,6 +99,12 @@ namespace zmq ...@@ -99,6 +99,12 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not. // If 1, (X)SUB socket should filter the messages. If 0, it should not.
bool filter; bool filter;
// Sends identity to all new connections.
bool send_identity;
// Receivers identity from all new connections.
bool recv_identity;
}; };
} }
......
...@@ -65,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -65,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL), peer (NULL),
sink (NULL), sink (NULL),
state (active), state (active),
delay (delay_), delay (delay_)
pipe_id (0)
{ {
} }
...@@ -88,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) ...@@ -88,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_; sink = sink_;
} }
void zmq::pipe_t::set_pipe_id (uint32_t id_) void zmq::pipe_t::set_identity (const blob_t &identity_)
{ {
pipe_id = id_; identity = identity_;
} }
uint32_t zmq::pipe_t::get_pipe_id () zmq::blob_t zmq::pipe_t::get_identity ()
{ {
return pipe_id; return identity;
} }
bool zmq::pipe_t::check_read () bool zmq::pipe_t::check_read ()
......
/* /*
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -28,6 +29,7 @@ ...@@ -28,6 +29,7 @@
#include "object.hpp" #include "object.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "array.hpp" #include "array.hpp"
#include "blob.hpp"
namespace zmq namespace zmq
{ {
...@@ -71,8 +73,8 @@ namespace zmq ...@@ -71,8 +73,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_); void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients. // Pipe endpoint can store an opaque ID to be used by its clients.
void set_pipe_id (uint32_t id_); void set_identity (const blob_t &identity_);
uint32_t get_pipe_id (); blob_t get_identity ();
// Returns true if there is at least one message to read in the pipe. // Returns true if there is at least one message to read in the pipe.
bool check_read (); bool check_read ();
...@@ -183,8 +185,8 @@ namespace zmq ...@@ -183,8 +185,8 @@ namespace zmq
// asks us to. // asks us to.
bool delay; bool delay;
// Opaque ID. To be used by the clients, not the pipe itself. // Identity of the writer. Used uniquely by the reader side.
uint32_t pipe_id; blob_t identity;
// Returns true if the message is delimiter; false otherwise. // Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_); static bool is_delimiter (msg_t &msg_);
......
...@@ -147,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, ...@@ -147,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
zmq::req_session_t::~req_session_t () zmq::req_session_t::~req_session_t ()
{ {
state = options.recv_identity ? identity : bottom;
} }
int zmq::req_session_t::write (msg_t *msg_) int zmq::req_session_t::write (msg_t *msg_)
{ {
if (state == bottom) { switch (state) {
case bottom:
if (msg_->flags () == msg_t::more && msg_->size () == 0) { if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body; state = body;
return xreq_session_t::write (msg_); return xreq_session_t::write (msg_);
} }
} break;
else { case body:
if (msg_->flags () == msg_t::more) if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_); return xreq_session_t::write (msg_);
if (msg_->flags () == 0) { if (msg_->flags () == 0) {
state = bottom; state = bottom;
return xreq_session_t::write (msg_); return xreq_session_t::write (msg_);
} }
break;
case identity:
if (msg_->flags () == 0) {
state = bottom;
return xreq_session_t::write (msg_);
}
break;
} }
errno = EFAULT; errno = EFAULT;
return -1; return -1;
......
...@@ -71,6 +71,7 @@ namespace zmq ...@@ -71,6 +71,7 @@ namespace zmq
private: private:
enum { enum {
identity,
bottom, bottom,
body body
} state; } state;
......
...@@ -112,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -112,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL), engine (NULL),
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
has_linger_timer (false) has_linger_timer (false),
send_identity (options_.send_identity),
recv_identity (options_.recv_identity)
{ {
if (protocol_) if (protocol_)
protocol = protocol_; protocol = protocol_;
...@@ -146,6 +148,16 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -146,6 +148,16 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::read (msg_t *msg_) int zmq::session_base_t::read (msg_t *msg_)
{ {
// First message to send is identity (if required).
if (send_identity) {
zmq_assert (!(msg_->flags () & msg_t::more));
msg_->init_size (options.identity_size);
memcpy (msg_->data (), options.identity, options.identity_size);
send_identity = false;
incomplete_in = false;
return 0;
}
if (!pipe || !pipe->read (msg_)) { if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
...@@ -157,6 +169,12 @@ int zmq::session_base_t::read (msg_t *msg_) ...@@ -157,6 +169,12 @@ int zmq::session_base_t::read (msg_t *msg_)
int zmq::session_base_t::write (msg_t *msg_) int zmq::session_base_t::write (msg_t *msg_)
{ {
// First message to receive is identity (if required).
if (recv_identity) {
msg_->set_flags (msg_t::identity);
recv_identity = false;
}
if (pipe && pipe->write (msg_)) { if (pipe && pipe->write (msg_)) {
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
/* /*
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -119,6 +120,10 @@ namespace zmq ...@@ -119,6 +120,10 @@ namespace zmq
// True is linger timer is running. // True is linger timer is running.
bool has_linger_timer; bool has_linger_timer;
// If true, identity is to be sent/recvd from the network.
bool send_identity;
bool recv_identity;
// Protocol and address to use when connecting. // Protocol and address to use when connecting.
std::string protocol; std::string protocol;
std::string address; std::string address;
......
...@@ -845,7 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) ...@@ -845,7 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_) void zmq::socket_base_t::extract_flags (msg_t *msg_)
{ {
// Test whether IDENTITY flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::identity)) {
zmq_assert (options.recv_identity);
printf ("identity recvd\n");
}
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false; rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore) if (rcvmore)
msg_->reset_flags (msg_t::more); msg_->reset_flags (msg_t::more);
} }
...@@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
// all the outstanding requests from that peer. // all the outstanding requests from that peer.
// options.delay_on_disconnect = false; // options.delay_on_disconnect = false;
options.send_identity = true;
options.recv_identity = true;
prefetched_msg.init (); prefetched_msg.init ();
} }
...@@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) ...@@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
// Generate a new peer ID. Take care to avoid duplicates. // Generate a new unique peer identity.
outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); unsigned char buf [5];
if (!outpipes.empty ()) { buf [0] = 0;
while (true) { put_uint32 (buf + 1, next_peer_id);
if (it == outpipes.end ()) blob_t identity (buf, 5);
it = outpipes.begin (); ++next_peer_id;
if (it->first != next_peer_id)
break;
++next_peer_id;
++it;
}
}
// Add the pipe to the map out outbound pipes. // Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true}; outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type ( bool ok = outpipes.insert (outpipes_t::value_type (
next_peer_id, outpipe)).second; identity, outpipe)).second;
zmq_assert (ok); zmq_assert (ok);
// Add the pipe to the list of inbound pipes. // Add the pipe to the list of inbound pipes.
pipe_->set_pipe_id (next_peer_id); pipe_->set_identity (identity);
fq.attach (pipe_); fq.attach (pipe_);
// Advance next peer ID so that if new connection is dropped shortly after
// its creation we don't accidentally get two subsequent peers with
// the same ID.
++next_peer_id;
} }
void zmq::xrep_t::xterminated (pipe_t *pipe_) void zmq::xrep_t::xterminated (pipe_t *pipe_)
...@@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) ...@@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true; more_out = true;
// Find the pipe associated with the peer ID stored in the prefix. // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message. // If there's no such pipe just silently ignore the message.
if (msg_->size () == 4) { blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); outpipes_t::iterator it = outpipes.find (identity);
outpipes_t::iterator it = outpipes.find (peer_id);
if (it != outpipes.end ()) {
if (it != outpipes.end ()) { current_out = it->second.pipe;
current_out = it->second.pipe; msg_t empty;
msg_t empty; int rc = empty.init ();
int rc = empty.init (); errno_assert (rc == 0);
errno_assert (rc == 0); if (!current_out->check_write (&empty)) {
if (!current_out->check_write (&empty)) { it->second.active = false;
it->second.active = false; more_out = false;
more_out = false; current_out = NULL;
current_out = NULL;
}
rc = empty.close ();
errno_assert (rc == 0);
} }
rc = empty.close ();
errno_assert (rc == 0);
} }
} }
int rc = msg_->close (); int rc = msg_->close ();
...@@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0) if (rc != 0)
return -1; return -1;
// If identity is received, change the key assigned to the pipe.
if (unlikely (msg_->flags () & msg_t::identity)) {
zmq_assert (!more_in);
// Empty identity means we can preserve the auto-generated identity.
if (msg_->size () != 0) {
// Actual change of the identity.
outpipes_t::iterator it = outpipes.begin ();
while (it != outpipes.end ()) {
if (it->second.pipe == pipe) {
blob_t identity ((unsigned char*) msg_->data (),
msg_->size ());
pipe->set_identity (identity);
outpipes.erase (it);
outpipe_t outpipe = {pipe, true};
outpipes.insert (outpipes_t::value_type (identity,
outpipe));
break;
}
++it;
}
zmq_assert (it != outpipes.end ());
}
// After processing the identity, try to get the next message.
rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
return -1;
}
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
if (more_in) { if (more_in) {
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
...@@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true; prefetched = true;
rc = msg_->close (); rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init_size (4);
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
return 0; return 0;
} }
......
/* /*
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2011 iMatix Corporation Copyright (c) 2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -27,6 +28,7 @@ ...@@ -27,6 +28,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp" #include "session_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
...@@ -78,7 +80,7 @@ namespace zmq ...@@ -78,7 +80,7 @@ namespace zmq
}; };
// Outbound pipes indexed by the peer IDs. // Outbound pipes indexed by the peer IDs.
typedef std::map <uint32_t, outpipe_t> outpipes_t; typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes; outpipes_t outpipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
......
/* /*
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -32,6 +33,9 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -32,6 +33,9 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
// If the socket is closing we can drop all the outbound requests. There'll // If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway. // be noone to receive the replies anyway.
// options.delay_on_close = false; // options.delay_on_close = false;
options.send_identity = true;
options.recv_identity = true;
} }
zmq::xreq_t::~xreq_t () zmq::xreq_t::~xreq_t ()
...@@ -52,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_) ...@@ -52,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); // XREQ socket doesn't use identities. We can safely drop it and
while (true) {
int rc = fq.recv (msg_, flags_);
if (rc != 0)
return rc;
if (likely (!(msg_->flags () & msg_t::identity)))
break;
}
return 0;
} }
bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_in ()
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
fprintf (stderr, "test_pair_inproc running...\n"); fprintf (stderr, "test_invalid_rep running...\n");
// Create REQ/XREP wiring. // Create REQ/XREP wiring.
void *ctx = zmq_init (1); void *ctx = zmq_init (1);
...@@ -49,23 +49,24 @@ int main (int argc, char *argv []) ...@@ -49,23 +49,24 @@ int main (int argc, char *argv [])
assert (rc == 1); assert (rc == 1);
// Receive the request. // Receive the request.
char addr [4]; char addr [32];
int addr_size;
char bottom [1]; char bottom [1];
char body [1]; char body [1];
rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0); addr_size = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
assert (rc == 4); assert (addr_size >= 0);
rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0); rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0);
assert (rc == 0); assert (rc == 0);
rc = zmq_recv (xrep_socket, body, sizeof (body), 0); rc = zmq_recv (xrep_socket, body, sizeof (body), 0);
assert (rc == 1); assert (rc == 1);
// Send invalid reply. // Send invalid reply.
rc = zmq_send (xrep_socket, addr, 4, 0); rc = zmq_send (xrep_socket, addr, addr_size, 0);
assert (rc == 4); assert (rc == addr_size);
// Send valid reply. // Send valid reply.
rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDMORE); rc = zmq_send (xrep_socket, addr, addr_size, ZMQ_SNDMORE);
assert (rc == 4); assert (rc == addr_size);
rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE); rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE);
assert (rc == 0); assert (rc == 0);
rc = zmq_send (xrep_socket, "b", 1, 0); rc = zmq_send (xrep_socket, "b", 1, 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment