Commit 9fab9937 authored by Martin Hurton's avatar Martin Hurton

Fix issue #406

When a peer reconnects, the router socket receives an identity
message containing this peer id. When this happens, the current
implementation crashes.

This patch makes a router socket to silently ignore all identity
messages coming from reconnected peers.
parent 84560c16
...@@ -232,6 +232,11 @@ void zmq::msg_t::reset_flags (unsigned char flags_) ...@@ -232,6 +232,11 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
bool zmq::msg_t::is_identity () const
{
return (u.base.flags & identity) == identity;
}
bool zmq::msg_t::is_delimiter () bool zmq::msg_t::is_delimiter ()
{ {
return u.base.type == type_delimiter; return u.base.type == type_delimiter;
......
...@@ -69,6 +69,7 @@ namespace zmq ...@@ -69,6 +69,7 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
bool is_identity () const;
bool is_delimiter (); bool is_delimiter ();
bool is_vsm (); bool is_vsm ();
......
...@@ -211,13 +211,17 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_) ...@@ -211,13 +211,17 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_)
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe); int rc = fq.recvpipe (msg_, &pipe);
if (rc != 0) {
errno = EAGAIN; // It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && msg_->is_identity ())
rc = fq.recvpipe (msg_, &pipe);
if (rc != 0)
return -1; return -1;
}
// Identity is not expected
zmq_assert ((msg_->flags () & msg_t::identity) == 0);
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
...@@ -267,11 +271,18 @@ bool zmq::router_t::xhas_in () ...@@ -267,11 +271,18 @@ bool zmq::router_t::xhas_in ()
// The message, if read, is kept in the pre-fetch buffer. // The message, if read, is kept in the pre-fetch buffer.
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, &pipe); int rc = fq.recvpipe (&prefetched_msg, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && prefetched_msg.is_identity ())
rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0) if (rc != 0)
return false; return false;
// Identity is not expected zmq_assert (pipe != NULL);
zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0);
blob_t identity = pipe->get_identity (); blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ()); rc = prefetched_id.init_size (identity.size ());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment