Commit bbfac783 authored by Martin Sustrik's avatar Martin Sustrik

multi-part message work with UPSTREAM/DOWNSTREAM

parent ed291b02
...@@ -25,7 +25,8 @@ ...@@ -25,7 +25,8 @@
zmq::lb_t::lb_t () : zmq::lb_t::lb_t () :
active (0), active (0),
current (0) current (0),
tbc (false)
{ {
} }
...@@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_) ...@@ -44,6 +45,8 @@ void zmq::lb_t::attach (writer_t *pipe_)
void zmq::lb_t::detach (writer_t *pipe_) void zmq::lb_t::detach (writer_t *pipe_)
{ {
zmq_assert (!tbc || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
// accordingly. // accordingly.
if (pipes.index (pipe_) < active) { if (pipes.index (pipe_) < active) {
...@@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_) ...@@ -64,9 +67,12 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{ {
while (active > 0) { while (active > 0) {
if (pipes [current]->write (msg_)) if (pipes [current]->write (msg_)) {
tbc = msg_->flags & ZMQ_MSG_TBC;
break; break;
}
zmq_assert (!tbc);
active--; active--;
if (current < active) if (current < active)
pipes.swap (current, active); pipes.swap (current, active);
...@@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -80,20 +86,27 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
pipes [current]->flush (); // If it's final part of the message we can fluch it downstream and
// continue round-robinning (load balance).
if (!tbc) {
pipes [current]->flush ();
current = (current + 1) % active;
}
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
current = (current + 1) % active;
return 0; return 0;
} }
bool zmq::lb_t::has_out () bool zmq::lb_t::has_out ()
{ {
// If one part of the message was already written we can definitely
// write the rest of the message.
if (tbc)
return true;
while (active > 0) { while (active > 0) {
if (pipes [current]->check_write ()) if (pipes [current]->check_write ())
return true; return true;
......
...@@ -53,6 +53,9 @@ namespace zmq ...@@ -53,6 +53,9 @@ namespace zmq
// Points to the last pipe that the most recent message was sent to. // Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current; pipes_t::size_type current;
// True if last we are in the middle of a multipart message.
bool tbc;
lb_t (const lb_t&); lb_t (const lb_t&);
void operator = (const lb_t&); void operator = (const lb_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment