Commit a1e09fac authored by Martin Sustrik's avatar Martin Sustrik

ROUTER socket reports error when message cannot be routed

Till now, message was silently dropped if it was sent to
a non-existent peer. Now, ECANTROUTE error is returned.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 72a793f7
...@@ -79,6 +79,9 @@ The provided 'socket' was invalid. ...@@ -79,6 +79,9 @@ The provided 'socket' was invalid.
*EINTR*:: *EINTR*::
The operation was interrupted by delivery of a signal before the message was The operation was interrupted by delivery of a signal before the message was
sent. sent.
*ECANTROUTE*::
Message cannot be routed to the destination specified as the peer is either
dead or disconnected. This error makes sense only with ZMQ_ROUTER socket.
EXAMPLE EXAMPLE
......
...@@ -107,6 +107,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ...@@ -107,6 +107,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52) #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53) #define ETERM (ZMQ_HAUSNUMERO + 53)
#define EMTHREAD (ZMQ_HAUSNUMERO + 54) #define EMTHREAD (ZMQ_HAUSNUMERO + 54)
#define ECANTROUTE (ZMQ_HAUSNUMERO + 55)
/* This function retrieves the errno as it is known to 0MQ library. The goal */ /* This function retrieves the errno as it is known to 0MQ library. The goal */
/* of this function is to make the code 100% portable, including where 0MQ */ /* of this function is to make the code 100% portable, including where 0MQ */
......
...@@ -137,12 +137,18 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -137,12 +137,18 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
more_out = true; more_out = true;
// Find the pipe associated with the peer ID stored in the prefix. // Find the pipe associated with the peer ID stored in the prefix.
// If there's no such pipe just silently ignore the message. if (unlikely (msg_->size () != 4)) {
zmq_assert (msg_->size () == 4); errno = ECANTROUTE;
return -1;
}
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
outpipes_t::iterator it = outpipes.find (peer_id); outpipes_t::iterator it = outpipes.find (peer_id);
if (unlikely (it == outpipes.end ())) {
errno = ECANTROUTE;
return -1;
}
if (it != outpipes.end ()) { // Check whether the pipe is available for writing.
current_out = it->second.pipe; current_out = it->second.pipe;
msg_t empty; msg_t empty;
int rc = empty.init (); int rc = empty.init ();
...@@ -155,7 +161,6 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -155,7 +161,6 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
rc = empty.close (); rc = empty.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment