Commit 6b873d4f authored by Martin Sustrik's avatar Martin Sustrik

ROUTER socket blocks on SNDHWM

Till now the message was droppen in such case.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent a1e09fac
......@@ -129,40 +129,44 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
if (msg_->flags () & msg_t::label) {
more_out = true;
// Find the pipe associated with the peer ID stored in the prefix.
if (unlikely (msg_->size () != 4)) {
errno = ECANTROUTE;
return -1;
}
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
outpipes_t::iterator it = outpipes.find (peer_id);
if (unlikely (it == outpipes.end ())) {
errno = ECANTROUTE;
return -1;
}
// Check whether the pipe is available for writing.
current_out = it->second.pipe;
msg_t empty;
int rc = empty.init ();
errno_assert (rc == 0);
if (!current_out->check_write (&empty)) {
it->second.active = false;
more_out = false;
current_out = NULL;
}
// The first message part has to be label.
if (unlikely (!(msg_->flags () & msg_t::label))) {
errno = EFSM;
return -1;
}
// Find the pipe associated with the peer ID stored in the message.
if (unlikely (msg_->size () != 4)) {
errno = ECANTROUTE;
return -1;
}
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
outpipes_t::iterator it = outpipes.find (peer_id);
if (unlikely (it == outpipes.end ())) {
errno = ECANTROUTE;
return -1;
}
// Check whether the pipe is available for writing.
msg_t empty;
int rc = empty.init ();
errno_assert (rc == 0);
if (!it->second.pipe->check_write (&empty)) {
rc = empty.close ();
errno_assert (rc == 0);
it->second.active = false;
errno = EAGAIN;
return -1;
}
rc = empty.close ();
errno_assert (rc == 0);
int rc = msg_->close ();
// Mark the pipe to send the message to.
current_out = it->second.pipe;
more_out = true;
// Clean up the message object.
rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment