Commit 923609b0 authored by Martin Hurton's avatar Martin Hurton

Implement flow control for ZMQ_REQ sockets

parent 42e575cb
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
zmq::req_t::req_t (class app_thread_t *parent_) : zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t (parent_), socket_base_t (parent_),
active (0),
current (0), current (0),
waiting_for_reply (false), waiting_for_reply (false),
reply_pipe_active (false), reply_pipe_active (false),
...@@ -45,7 +46,12 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_, ...@@ -45,7 +46,12 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (in_pipes.size () == out_pipes.size ()); zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes.push_back (inpipe_); in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
out_pipes.push_back (outpipe_); out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
} }
void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
...@@ -61,16 +67,28 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) ...@@ -61,16 +67,28 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
in_pipes_t::size_type index = in_pipes.index (pipe_); in_pipes_t::size_type index = in_pipes.index (pipe_);
// If corresponding outpipe is still in place simply nullify the pointer // If the corresponding outpipe is still in place nullify the pointer
// to the inpipe. // to the inpipe ane move both pipes into inactive zone.
if (out_pipes [index]) { if (out_pipes [index]) {
in_pipes [index] = NULL; in_pipes [index] = NULL;
if (index < active) {
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
if (current = active)
current = 0;
}
return; return;
} }
// Now both inpipe and outpipe are detached. Remove them from the lists. // Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index); in_pipes.erase (index);
out_pipes.erase (index); out_pipes.erase (index);
if (index < active) {
active--;
if (current == active)
current = 0;
}
} }
void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
...@@ -80,20 +98,33 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) ...@@ -80,20 +98,33 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
out_pipes_t::size_type index = out_pipes.index (pipe_); out_pipes_t::size_type index = out_pipes.index (pipe_);
// If corresponding inpipe is still in place simply nullify the pointer // If the corresponding inpipe is still in place nullify the pointer
// to the outpipe. // to the outpipe and move both pipes into inactive zone.
if (in_pipes [index]) { if (in_pipes [index]) {
out_pipes [index] = NULL; out_pipes [index] = NULL;
if (index < active) {
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
if (current == active)
current = 0;
}
return; return;
} }
// Now both inpipe and outpipe are detached. Remove them from the lists. // Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index); in_pipes.erase (index);
out_pipes.erase (index); out_pipes.erase (index);
if (index < active) {
active--;
if (current == active)
current = 0;
}
} }
void zmq::req_t::xkill (class reader_t *pipe_) void zmq::req_t::xkill (class reader_t *pipe_)
{ {
zmq_assert (waiting_for_reply);
zmq_assert (pipe_ == reply_pipe); zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = false; reply_pipe_active = false;
...@@ -104,13 +135,19 @@ void zmq::req_t::xrevive (class reader_t *pipe_) ...@@ -104,13 +135,19 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
// TODO: Actually, misbehaving peer can cause this kind of thing. // TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection. // Handle it decently, presumably kill the offending connection.
zmq_assert (pipe_ == reply_pipe); zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = true; reply_pipe_active = true;
} }
void zmq::req_t::xrevive (class writer_t *pipe_) void zmq::req_t::xrevive (class writer_t *pipe_)
{ {
zmq_not_implemented (); out_pipes_t::size_type index = out_pipes.index (pipe_);
zmq_assert (index >= active);
if (in_pipes [index] != NULL) {
in_pipes.swap (index, active);
out_pipes.swap (index, active);
active++;
}
} }
int zmq::req_t::xsetsockopt (int option_, const void *optval_, int zmq::req_t::xsetsockopt (int option_, const void *optval_,
...@@ -129,22 +166,22 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -129,22 +166,22 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
if (out_pipes.empty ()) { while (active > 0) {
errno = EAGAIN; if (out_pipes [current]->check_write ())
return -1; break;
}
current++; active--;
if (current >= out_pipes.size ()) if (current < active) {
in_pipes.swap (current, active);
out_pipes.swap (current, active);
}
else
current = 0; current = 0;
}
// TODO: Infinite loop can result here. Integrate the algorithm with if (active == 0) {
// the active pipes list (i.e. pipe pair that has one pipe missing is errno = EAGAIN;
// considered to be inactive. return -1;
while (!in_pipes [current] || !out_pipes [current]) {
current++;
if (current >= out_pipes.size ())
current = 0;
} }
// Push message to the selected pipe. // Push message to the selected pipe.
...@@ -164,6 +201,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -164,6 +201,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
// Move to the next pipe (load-balancing).
current = (current + 1) % active;
return 0; return 0;
} }
...@@ -213,7 +253,23 @@ bool zmq::req_t::xhas_in () ...@@ -213,7 +253,23 @@ bool zmq::req_t::xhas_in ()
bool zmq::req_t::xhas_out () bool zmq::req_t::xhas_out ()
{ {
return !waiting_for_reply; if (waiting_for_reply)
return false;
while (active > 0) {
if (out_pipes [current]->check_write ())
return true;;
active--;
if (current < active) {
in_pipes.swap (current, active);
out_pipes.swap (current, active);
}
else
current = 0;
}
return false;
} }
...@@ -64,6 +64,9 @@ namespace zmq ...@@ -64,6 +64,9 @@ namespace zmq
typedef yarray_t <class reader_t> in_pipes_t; typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes; in_pipes_t in_pipes;
// Number of active pipes.
size_t active;
// Req_t load-balances the requests - 'current' points to the session // Req_t load-balances the requests - 'current' points to the session
// that's processing the request at the moment. // that's processing the request at the moment.
out_pipes_t::size_type current; out_pipes_t::size_type current;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment