Commit f9521c6b authored by Martin Hurton's avatar Martin Hurton

PGM: implement flow control

parent 61ee6fae
...@@ -38,7 +38,9 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, ...@@ -38,7 +38,9 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
io_object_t (parent_), io_object_t (parent_),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
inout (NULL) inout (NULL),
mru_decoder (NULL),
pending_bytes (0)
{ {
} }
...@@ -76,6 +78,9 @@ void zmq::pgm_receiver_t::unplug () ...@@ -76,6 +78,9 @@ void zmq::pgm_receiver_t::unplug ()
} }
peers.clear (); peers.clear ();
mru_decoder = NULL;
pending_bytes = 0;
// Stop polling. // Stop polling.
rm_fd (socket_handle); rm_fd (socket_handle);
rm_fd (pipe_handle); rm_fd (pipe_handle);
...@@ -90,7 +95,30 @@ void zmq::pgm_receiver_t::revive () ...@@ -90,7 +95,30 @@ void zmq::pgm_receiver_t::revive ()
void zmq::pgm_receiver_t::resume_input () void zmq::pgm_receiver_t::resume_input ()
{ {
zmq_not_implemented (); // It is possible that the most recently used decoder
// processed the whole buffer but failed to write
// the last message into the pipe.
if (pending_bytes == 0) {
if (mru_decoder != NULL)
mru_decoder->process_buffer (NULL, 0);
return;
}
zmq_assert (mru_decoder != NULL);
zmq_assert (pending_ptr != NULL);
// Ask the decoder to process remaining data.
size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
pending_bytes -= n;
if (pending_bytes > 0)
return;
// Resume polling.
set_pollin (pipe_handle);
set_pollin (socket_handle);
in_event ();
} }
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
...@@ -111,6 +139,8 @@ void zmq::pgm_receiver_t::in_event () ...@@ -111,6 +139,8 @@ void zmq::pgm_receiver_t::in_event ()
unsigned char *data = NULL; unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL; const pgm_tsi_t *tsi = NULL;
zmq_assert (pending_bytes == 0);
// TODO: This loop can effectively block other engines in the same I/O // TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load. // thread in the case of high load.
while (true) { while (true) {
...@@ -130,6 +160,8 @@ void zmq::pgm_receiver_t::in_event () ...@@ -130,6 +160,8 @@ void zmq::pgm_receiver_t::in_event ()
if (received == -1) { if (received == -1) {
zmq_assert (it != peers.end ()); zmq_assert (it != peers.end ());
it->second.joined = false; it->second.joined = false;
if (it->second.decoder == mru_decoder)
mru_decoder = NULL;
if (it->second.decoder != NULL) { if (it->second.decoder != NULL) {
delete it->second.decoder; delete it->second.decoder;
it->second.decoder = NULL; it->second.decoder = NULL;
...@@ -172,10 +204,20 @@ void zmq::pgm_receiver_t::in_event () ...@@ -172,10 +204,20 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder->set_inout (inout); it->second.decoder->set_inout (inout);
} }
mru_decoder = it->second.decoder;
// Push all the data to the decoder. // Push all the data to the decoder.
// TODO: process_buffer may not process entire buffer!
ssize_t processed = it->second.decoder->process_buffer (data, received); ssize_t processed = it->second.decoder->process_buffer (data, received);
zmq_assert (processed == received); if (processed < received) {
// Save some state so we can resume the decoding process later.
pending_bytes = received - processed;
pending_ptr = data + processed;
// Stop polling.
reset_pollin (pipe_handle);
reset_pollin (socket_handle);
break;
}
} }
// Flush any messages decoder may have produced. // Flush any messages decoder may have produced.
......
...@@ -98,6 +98,15 @@ namespace zmq ...@@ -98,6 +98,15 @@ namespace zmq
// Parent session. // Parent session.
i_inout *inout; i_inout *inout;
// Most recently used decoder.
zmq_decoder_t *mru_decoder;
// Number of bytes not consumed by the decoder due to pipe overflow.
size_t pending_bytes;
// Pointer to data still waiting to be processed by the decoder.
unsigned char *pending_ptr;
// Poll handle associated with PGM socket. // Poll handle associated with PGM socket.
handle_t socket_handle; handle_t socket_handle;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment