Commit 1619b3d8 authored by Martin Sustrik's avatar Martin Sustrik

Message atomicity bug in load-balancer fixed

If the peer getting the message have disconnected in the middle
of multiplart message, the remaining part of the message went
to a different peer. This patch fixes the issue.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 92c7c183
...@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) : ...@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) :
active (0), active (0),
current (0), current (0),
more (false), more (false),
dropping (false),
sink (sink_), sink (sink_),
terminating (false) terminating (false)
{ {
...@@ -65,9 +66,16 @@ void zmq::lb_t::terminate () ...@@ -65,9 +66,16 @@ void zmq::lb_t::terminate ()
void zmq::lb_t::terminated (writer_t *pipe_) void zmq::lb_t::terminated (writer_t *pipe_)
{ {
pipes_t::size_type index = pipes.index (pipe_);
// If we are in the middle of multipart message and current pipe
// have disconnected, we have to drop the remainder of the message.
if (index == current && more)
dropping = true;
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
// accordingly. // accordingly.
if (pipes.index (pipe_) < active) { if (index < active) {
active--; active--;
if (current == active) if (current == active)
current = 0; current = 0;
...@@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_) ...@@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{ {
// Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode.
if (dropping) {
more = msg_->flags & ZMQ_MSG_MORE;
if (!more)
dropping = false;
int rc = zmq_msg_close (msg_);
errno_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
while (active > 0) { while (active > 0) {
if (pipes [current]->write (msg_)) { if (pipes [current]->write (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags & ZMQ_MSG_MORE;
......
...@@ -61,6 +61,9 @@ namespace zmq ...@@ -61,6 +61,9 @@ namespace zmq
// True if last we are in the middle of a multipart message. // True if last we are in the middle of a multipart message.
bool more; bool more;
// True if we are dropping current message.
bool dropping;
// Object to send events to. // Object to send events to.
class own_t *sink; class own_t *sink;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment