Commit 01396768 authored by Martin Hurton's avatar Martin Hurton

fq: code cleanup

parent 8152502f
...@@ -75,7 +75,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -75,7 +75,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
errno_assert (rc == 0); errno_assert (rc == 0);
// Round-robin over the pipes to get the next message. // Round-robin over the pipes to get the next message.
for (pipes_t::size_type count = active; count != 0; count--) { while (active > 0) {
// Try to fetch new message. If we've already read part of the message // Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available. // subsequent part should be immediately available.
...@@ -84,7 +84,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -84,7 +84,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
// Check the atomicity of the message. If we've already received the // Check the atomicity of the message. If we've already received the
// first part of the message we should get the remaining parts // first part of the message we should get the remaining parts
// without blocking. // without blocking.
zmq_assert (!(more && !fetched)); zmq_assert (!more || fetched);
// Note that when message is not fetched, current pipe is deactivated // Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase // and replaced by another active pipe. Thus we don't have to increase
...@@ -92,21 +92,16 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -92,21 +92,16 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
if (fetched) { if (fetched) {
if (pipe_) if (pipe_)
*pipe_ = pipes [current]; *pipe_ = pipes [current];
more = more = msg_->flags () & msg_t::more? true: false;
msg_->flags () & msg_t::more ? true : false; if (!more)
if (!more) { current = (current + 1) % active;
current++;
if (current >= active)
current = 0;
}
return 0; return 0;
} }
else {
active--; active--;
pipes.swap (current, active); pipes.swap (current, active);
if (current == active) if (current == active)
current = 0; current = 0;
}
} }
// No message is available. Initialise the output parameter // No message is available. Initialise the output parameter
...@@ -127,7 +122,7 @@ bool zmq::fq_t::has_in () ...@@ -127,7 +122,7 @@ bool zmq::fq_t::has_in ()
// queueing algorithm. If there are no messages available current will // queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first // get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available. // pipe holding messages, skipping only pipes with no messages available.
for (pipes_t::size_type count = active; count != 0; count--) { while (active > 0) {
if (pipes [current]->check_read ()) if (pipes [current]->check_read ())
return true; return true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment