Commit bc9b7f1f authored by Martin Hurton's avatar Martin Hurton

lb: bugfix - never skip active pipe when sending msg

parent 6b3c1798
...@@ -46,8 +46,11 @@ void zmq::lb_t::detach (writer_t *pipe_) ...@@ -46,8 +46,11 @@ void zmq::lb_t::detach (writer_t *pipe_)
{ {
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
// accordingly. // accordingly.
if (pipes.index (pipe_) < active) if (pipes.index (pipe_) < active) {
active--; active--;
if (current == active)
current = 0;
}
pipes.erase (pipe_); pipes.erase (pipe_);
} }
...@@ -55,6 +58,8 @@ void zmq::lb_t::kill (writer_t *pipe_) ...@@ -55,6 +58,8 @@ void zmq::lb_t::kill (writer_t *pipe_)
{ {
// Move the pipe to the list of inactive pipes. // Move the pipe to the list of inactive pipes.
active--; active--;
if (current == active)
current = 0;
pipes.swap (pipes.index (pipe_), active); pipes.swap (pipes.index (pipe_), active);
} }
...@@ -73,11 +78,6 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -73,11 +78,6 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
// Move to the next pipe (load-balancing).
current++;
if (current >= active)
current = 0;
// TODO: Implement this once queue limits are in-place. // TODO: Implement this once queue limits are in-place.
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
...@@ -89,6 +89,11 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -89,6 +89,11 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
current++;
if (current >= active)
current = 0;
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment