Commit 61ee6fae authored by Martin Hurton's avatar Martin Hurton

Implement flow control

This commit introduces the necessary changes necessary
for implementing flow control. None of the socket types
implements the flow control yet. The code will crash when
the flow control is enabled and the thw lwm is reached.

The following commits will add flow-control support for
individual socket types.
parent 31d36104
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
attach, attach,
bind, bind,
revive, revive,
reader_info,
pipe_term, pipe_term,
pipe_term_ack, pipe_term_ack,
term_req, term_req,
...@@ -84,6 +85,13 @@ namespace zmq ...@@ -84,6 +85,13 @@ namespace zmq
struct { struct {
} revive; } revive;
// Sent by pipe reader to inform pipe writer
// about how many messages it has read so far.
// Used to implement the flow control.
struct {
uint64_t msgs_read;
} reader_info;
// Sent by pipe reader to pipe writer to ask it to terminate // Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe. // its end of the pipe.
struct { struct {
......
...@@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_) ...@@ -65,6 +65,11 @@ void zmq::downstream_t::xrevive (class reader_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::downstream_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out () ...@@ -104,4 +109,3 @@ bool zmq::downstream_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -115,3 +115,9 @@ namespace zmq ...@@ -115,3 +115,9 @@ namespace zmq
} while (false) } while (false)
#endif #endif
#define zmq_not_implemented() \
do {\
fprintf (stderr, "Hic sunt leones (%s:%d)\n", __FILE__, __LINE__);\
abort ();\
} while (false)
...@@ -35,6 +35,7 @@ namespace zmq ...@@ -35,6 +35,7 @@ namespace zmq
virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0; virtual void kill (class reader_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0; virtual void revive (class reader_t *pipe_) = 0;
virtual void revive (class writer_t *pipe_) = 0;
}; };
} }
......
...@@ -41,6 +41,8 @@ namespace zmq ...@@ -41,6 +41,8 @@ namespace zmq
// are messages to send available. // are messages to send available.
virtual void revive () = 0; virtual void revive () = 0;
virtual void resume_input () = 0;
// Engine should add the prefix supplied to all inbound messages. // Engine should add the prefix supplied to all inbound messages.
virtual void add_prefix (const blob_t &identity_) = 0; virtual void add_prefix (const blob_t &identity_) = 0;
......
...@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_) ...@@ -54,15 +54,6 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_); pipes.erase (pipe_);
} }
void zmq::lb_t::kill (writer_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
active--;
if (current == active)
current = 0;
pipes.swap (pipes.index (pipe_), active);
}
void zmq::lb_t::revive (writer_t *pipe_) void zmq::lb_t::revive (writer_t *pipe_)
{ {
// Move the pipe to the list of active pipes. // Move the pipe to the list of active pipes.
...@@ -72,17 +63,24 @@ void zmq::lb_t::revive (writer_t *pipe_) ...@@ -72,17 +63,24 @@ void zmq::lb_t::revive (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{ {
while (active > 0) {
if (pipes [current]->write (msg_))
break;
active--;
if (current < active)
pipes.swap (current, active);
else
current = 0;
}
// If there are no pipes we cannot send the message. // If there are no pipes we cannot send the message.
if (pipes.empty ()) { if (active == 0) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
// TODO: Implement this once queue limits are in-place. if (!(flags_ & ZMQ_NOFLUSH))
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
pipes [current]->write (msg_);
pipes [current]->flush (); pipes [current]->flush ();
// Detach the message from the data buffer. // Detach the message from the data buffer.
...@@ -90,24 +88,21 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -90,24 +88,21 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
zmq_assert (rc == 0); zmq_assert (rc == 0);
// Move to the next pipe (load-balancing). // Move to the next pipe (load-balancing).
current++; current = (current + 1) % active;
if (current >= active)
current = 0;
return 0; return 0;
} }
bool zmq::lb_t::has_out () bool zmq::lb_t::has_out ()
{ {
for (int count = active; count != 0; count--) { while (active > 0) {
if (pipes [current]->check_write ())
// We should be able to write at least 1-byte message to interrupt
// polling for POLLOUT.
// TODO: Shouldn't we use a saner value here?
if (pipes [current]->check_write (1))
return true; return true;
current++;
if (current >= active) active--;
if (current < active)
pipes.swap (current, active);
else
current = 0; current = 0;
} }
......
...@@ -36,7 +36,6 @@ namespace zmq ...@@ -36,7 +36,6 @@ namespace zmq
void attach (class writer_t *pipe_); void attach (class writer_t *pipe_);
void detach (class writer_t *pipe_); void detach (class writer_t *pipe_);
void kill (class writer_t *pipe_);
void revive (class writer_t *pipe_); void revive (class writer_t *pipe_);
int send (zmq_msg_t *msg_, int flags_); int send (zmq_msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
......
...@@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -95,6 +95,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum (); process_seqnum ();
break; break;
case command_t::reader_info:
process_reader_info (cmd_.args.reader_info.msgs_read);
break;
case command_t::pipe_term: case command_t::pipe_term:
process_pipe_term (); process_pipe_term ();
return; return;
...@@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_) ...@@ -249,6 +253,16 @@ void zmq::object_t::send_revive (object_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_reader_info (writer_t *destination_,
uint64_t msgs_read_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::reader_info;
cmd.args.reader_info.msgs_read = msgs_read_;
send_command (cmd);
}
void zmq::object_t::send_pipe_term (writer_t *destination_) void zmq::object_t::send_pipe_term (writer_t *destination_)
{ {
command_t cmd; command_t cmd;
...@@ -323,6 +337,11 @@ void zmq::object_t::process_revive () ...@@ -323,6 +337,11 @@ void zmq::object_t::process_revive ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_reader_info (uint64_t msgs_read_)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_term () void zmq::object_t::process_pipe_term ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -71,6 +71,8 @@ namespace zmq ...@@ -71,6 +71,8 @@ namespace zmq
class reader_t *in_pipe_, class writer_t *out_pipe_, class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true); const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_); void send_revive (class object_t *destination_);
void send_reader_info (class writer_t *destination_,
uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_); void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_); void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_, void send_term_req (class socket_base_t *destination_,
...@@ -88,6 +90,7 @@ namespace zmq ...@@ -88,6 +90,7 @@ namespace zmq
virtual void process_bind (class reader_t *in_pipe_, virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_, const blob_t &peer_identity_); class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive (); virtual void process_revive ();
virtual void process_reader_info (uint64_t msgs_read_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_); virtual void process_term_req (class owned_t *object_);
......
...@@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -45,19 +45,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
switch (option_) { switch (option_) {
case ZMQ_HWM: case ZMQ_HWM:
if (optvallen_ != sizeof (int64_t)) { if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
hwm = *((int64_t*) optval_); hwm = *((uint64_t*) optval_);
return 0; return 0;
case ZMQ_LWM: case ZMQ_LWM:
if (optvallen_ != sizeof (int64_t)) { if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
lwm = *((int64_t*) optval_); lwm = *((uint64_t*) optval_);
return 0; return 0;
case ZMQ_SWAP: case ZMQ_SWAP:
......
...@@ -33,8 +33,8 @@ namespace zmq ...@@ -33,8 +33,8 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_); int setsockopt (int option_, const void *optval_, size_t optvallen_);
int64_t hwm; uint64_t hwm;
int64_t lwm; uint64_t lwm;
int64_t swap; int64_t swap;
uint64_t affinity; uint64_t affinity;
blob_t identity; blob_t identity;
......
...@@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_) ...@@ -73,6 +73,11 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)
alive = true; alive = true;
} }
void zmq::p2p_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -87,10 +92,8 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
// TODO: Implement this once queue limits are in-place. bool written = outpipe->write (msg_);
zmq_assert (outpipe->check_write (zmq_msg_size (msg_))); zmq_assert (written);
outpipe->write (msg_);
if (!(flags_ & ZMQ_NOFLUSH)) if (!(flags_ & ZMQ_NOFLUSH))
outpipe->flush (); outpipe->flush ();
......
...@@ -39,6 +39,7 @@ namespace zmq ...@@ -39,6 +39,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive () ...@@ -88,6 +88,11 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::pgm_receiver_t::resume_input ()
{
zmq_not_implemented ();
}
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
{ {
// No need for tracerouting functionality in PGM socket at the moment. // No need for tracerouting functionality in PGM socket at the moment.
......
...@@ -54,6 +54,7 @@ namespace zmq ...@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input ();
void add_prefix (const blob_t &identity_); void add_prefix (const blob_t &identity_);
void trim_prefix (); void trim_prefix ();
......
...@@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive () ...@@ -102,6 +102,11 @@ void zmq::pgm_sender_t::revive ()
out_event (); out_event ();
} }
void zmq::pgm_sender_t::resume_input ()
{
zmq_assert (false);
}
void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
{ {
// No need for tracerouting functionality in PGM socket at the moment. // No need for tracerouting functionality in PGM socket at the moment.
......
...@@ -52,6 +52,7 @@ namespace zmq ...@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input ();
void add_prefix (const blob_t &identity_); void add_prefix (const blob_t &identity_);
void trim_prefix (); void trim_prefix ();
......
...@@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_, ...@@ -28,8 +28,12 @@ zmq::reader_t::reader_t (object_t *parent_,
peer (NULL), peer (NULL),
hwm (hwm_), hwm (hwm_),
lwm (lwm_), lwm (lwm_),
msgs_read (0),
endpoint (NULL) endpoint (NULL)
{ {
// Adjust lwm and hwm.
if (lwm == 0 || lwm > hwm)
lwm = hwm;
} }
zmq::reader_t::~reader_t () zmq::reader_t::~reader_t ()
...@@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) ...@@ -73,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return false; return false;
} }
// TODO: Adjust the size of the pipe. msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
send_reader_info (peer, msgs_read);
return true; return true;
} }
...@@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_, ...@@ -111,8 +117,14 @@ zmq::writer_t::writer_t (object_t *parent_,
peer (NULL), peer (NULL),
hwm (hwm_), hwm (hwm_),
lwm (lwm_), lwm (lwm_),
msgs_read (0),
msgs_written (0),
stalled (false),
endpoint (NULL) endpoint (NULL)
{ {
// Adjust lwm and hwm.
if (lwm == 0 || lwm > hwm)
lwm = hwm;
} }
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
...@@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_) ...@@ -131,32 +143,41 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
peer = &pipe->reader; peer = &pipe->reader;
} }
bool zmq::writer_t::check_write (uint64_t size_) bool zmq::writer_t::check_write ()
{ {
// TODO: Check whether hwm is exceeded. if (pipe_full ()) {
stalled = true;
return false;
}
return true; return true;
} }
bool zmq::writer_t::write (zmq_msg_t *msg_) bool zmq::writer_t::write (zmq_msg_t *msg_)
{ {
if (pipe_full ()) {
stalled = true;
return false;
}
pipe->write (*msg_); pipe->write (*msg_);
msgs_written++;
return true; return true;
// TODO: Adjust size of the pipe.
} }
void zmq::writer_t::rollback () void zmq::writer_t::rollback ()
{ {
while (true) {
zmq_msg_t msg; zmq_msg_t msg;
if (!pipe->unwrite (&msg))
break; while (pipe->unwrite (&msg)) {
zmq_msg_close (&msg); zmq_msg_close (&msg);
msgs_written--;
} }
// TODO: We don't have to inform the reader side of the pipe about if (stalled && endpoint != NULL && !pipe_full()) {
// the event. We'll simply adjust the pipe size and keep calm. stalled = false;
endpoint->revive (this);
}
} }
void zmq::writer_t::flush () void zmq::writer_t::flush ()
...@@ -179,6 +200,15 @@ void zmq::writer_t::term () ...@@ -179,6 +200,15 @@ void zmq::writer_t::term ()
pipe->flush (); pipe->flush ();
} }
void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
{
msgs_read = msgs_read_;
if (stalled && endpoint != NULL) {
stalled = false;
endpoint->revive (this);
}
}
void zmq::writer_t::process_pipe_term () void zmq::writer_t::process_pipe_term ()
{ {
if (endpoint) if (endpoint)
...@@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term () ...@@ -189,6 +219,11 @@ void zmq::writer_t::process_pipe_term ()
send_pipe_term_ack (p); send_pipe_term_ack (p);
} }
bool zmq::writer_t::pipe_full ()
{
return hwm > 0 && msgs_written - msgs_read == hwm;
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) : uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, hwm_, lwm_), reader (reader_parent_, hwm_, lwm_),
......
...@@ -68,10 +68,8 @@ namespace zmq ...@@ -68,10 +68,8 @@ namespace zmq
uint64_t hwm; uint64_t hwm;
uint64_t lwm; uint64_t lwm;
// Positions of head and tail of the pipe (in bytes). // Number of messages read so far.
uint64_t head; uint64_t msgs_read;
uint64_t tail;
uint64_t last_sent_head;
// Endpoint (either session or socket) the pipe is attached to. // Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint; i_endpoint *endpoint;
...@@ -91,10 +89,10 @@ namespace zmq ...@@ -91,10 +89,10 @@ namespace zmq
void set_pipe (class pipe_t *pipe_); void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_); void set_endpoint (i_endpoint *endpoint_);
// Checks whether message with specified size can be written to the // Checks whether a message can be written to the pipe.
// pipe. If writing the message would cause high watermark to be // If writing the message would cause high watermark to be
// exceeded, the function returns false. // exceeded, the function returns false.
bool check_write (uint64_t size_); bool check_write ();
// Writes a message to the underlying pipe. Returns false if the // Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached. // message cannot be written because high watermark was reached.
...@@ -111,9 +109,14 @@ namespace zmq ...@@ -111,9 +109,14 @@ namespace zmq
private: private:
void process_reader_info (uint64_t msgs_read_);
// Command handlers. // Command handlers.
void process_pipe_term (); void process_pipe_term ();
// Tests whether the pipe is already full.
bool pipe_full ();
// The underlying pipe. // The underlying pipe.
class pipe_t *pipe; class pipe_t *pipe;
...@@ -124,9 +127,15 @@ namespace zmq ...@@ -124,9 +127,15 @@ namespace zmq
uint64_t hwm; uint64_t hwm;
uint64_t lwm; uint64_t lwm;
// Positions of head and tail of the pipe (in bytes). // Last confirmed number of messages read from the pipe.
uint64_t head; // The actual number can be higher.
uint64_t tail; uint64_t msgs_read;
// Number of messages we have written so far.
uint64_t msgs_written;
// True iff the last attempt to write a message has failed.
bool stalled;
// Endpoint (either session or socket) the pipe is attached to. // Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint; i_endpoint *endpoint;
......
...@@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_) ...@@ -65,6 +65,11 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::pub_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::pub_t::xsetsockopt (int option_, const void *optval_, int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -87,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// First check whether all pipes are available for writing. // First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) { if (!out_pipes [i]->check_write ()) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
...@@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -97,7 +102,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward. // For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) { if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
out_pipes [i]->write (msg_); bool written = out_pipes [i]->write (msg_);
zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush (); out_pipes [i]->flush ();
} }
...@@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -110,7 +116,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// to send the message to - no refcount adjustment i.e. no atomic // to send the message to - no refcount adjustment i.e. no atomic
// operations are needed. // operations are needed.
if (pipes_count == 1) { if (pipes_count == 1) {
out_pipes [0]->write (msg_); bool written = out_pipes [0]->write (msg_);
zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [0]->flush (); out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
...@@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -130,7 +137,8 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// Push the message to all destinations. // Push the message to all destinations.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
out_pipes [i]->write (msg_); bool written = out_pipes [i]->write (msg_);
zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush (); out_pipes [i]->flush ();
} }
......
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_) ...@@ -144,6 +144,11 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)
active++; active++;
} }
void zmq::rep_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::rep_t::xsetsockopt (int option_, const void *optval_, int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -160,12 +165,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
// TODO: Implement this once queue limits are in-place. If the reply // TODO: Implement this once queue limits are in-place. If the reply
// overloads the buffer, connection should be torn down. // overloads the buffer, connection should be torn down.
zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); zmq_assert (reply_pipe->check_write ());
// Push message to the selected pipe. If requester have disconnected // Push message to the selected pipe. If requester have disconnected
// in the meantime, drop the reply. // in the meantime, drop the reply.
if (reply_pipe) { if (reply_pipe) {
reply_pipe->write (msg_); bool written = reply_pipe->write (msg_);
zmq_assert (written);
reply_pipe->flush (); reply_pipe->flush ();
} }
else { else {
......
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_) ...@@ -108,6 +108,11 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
reply_pipe_active = true; reply_pipe_active = true;
} }
void zmq::req_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::req_t::xsetsockopt (int option_, const void *optval_, int zmq::req_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -142,11 +147,9 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
current = 0; current = 0;
} }
// TODO: Implement this once queue limits are in-place.
zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe. // Push message to the selected pipe.
out_pipes [current]->write (msg_); bool written = out_pipes [current]->write (msg_);
zmq_assert (written);
out_pipes [current]->flush (); out_pipes [current]->flush ();
waiting_for_reply = true; waiting_for_reply = true;
......
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -164,6 +164,13 @@ void zmq::session_t::revive (reader_t *pipe_) ...@@ -164,6 +164,13 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive (); engine->revive ();
} }
void zmq::session_t::revive (writer_t *pipe_)
{
zmq_assert (out_pipe == pipe_);
if (engine)
engine->resume_input ();
}
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
{ {
} }
......
...@@ -57,6 +57,7 @@ namespace zmq ...@@ -57,6 +57,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_); void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_); void revive (class reader_t *pipe_);
void revive (class writer_t *pipe_);
private: private:
......
...@@ -552,6 +552,11 @@ void zmq::socket_base_t::revive (reader_t *pipe_) ...@@ -552,6 +552,11 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
xrevive (pipe_); xrevive (pipe_);
} }
void zmq::socket_base_t::revive (writer_t *pipe_)
{
xrevive (pipe_);
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
......
...@@ -93,6 +93,7 @@ namespace zmq ...@@ -93,6 +93,7 @@ namespace zmq
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_); void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_); void revive (class reader_t *pipe_);
void revive (class writer_t *pipe_);
protected: protected:
...@@ -106,6 +107,7 @@ namespace zmq ...@@ -106,6 +107,7 @@ namespace zmq
virtual void xdetach_outpipe (class writer_t *pipe_) = 0; virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0; virtual void xkill (class reader_t *pipe_) = 0;
virtual void xrevive (class reader_t *pipe_) = 0; virtual void xrevive (class reader_t *pipe_) = 0;
virtual void xrevive (class writer_t *pipe_) = 0;
// Actual algorithms are to be defined by individual socket types. // Actual algorithms are to be defined by individual socket types.
virtual int xsetsockopt (int option_, const void *optval_, virtual int xsetsockopt (int option_, const void *optval_,
......
...@@ -67,6 +67,11 @@ void zmq::sub_t::xrevive (class reader_t *pipe_) ...@@ -67,6 +67,11 @@ void zmq::sub_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_); fq.revive (pipe_);
} }
void zmq::sub_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -45,6 +45,7 @@ namespace zmq ...@@ -45,6 +45,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -62,6 +62,11 @@ void zmq::upstream_t::xrevive (class reader_t *pipe_) ...@@ -62,6 +62,11 @@ void zmq::upstream_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_); fq.revive (pipe_);
} }
void zmq::upstream_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
}
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -81,6 +81,11 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_) ...@@ -81,6 +81,11 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_); fq.revive (pipe_);
} }
void zmq::xrep_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -111,7 +116,8 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -111,7 +116,8 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
} }
// Push message to the selected pipe. // Push message to the selected pipe.
it->second->write (msg_); bool written = it->second->write (msg_);
zmq_assert (written);
it->second->flush (); it->second->flush ();
// Detach the message from the data buffer. // Detach the message from the data buffer.
......
...@@ -43,6 +43,7 @@ namespace zmq ...@@ -43,6 +43,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -63,6 +63,11 @@ void zmq::xreq_t::xrevive (class reader_t *pipe_) ...@@ -63,6 +63,11 @@ void zmq::xreq_t::xrevive (class reader_t *pipe_)
fq.revive (pipe_); fq.revive (pipe_);
} }
void zmq::xreq_t::xrevive (class writer_t *pipe_)
{
zmq_not_implemented ();
}
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
...@@ -97,4 +102,3 @@ bool zmq::xreq_t::xhas_out () ...@@ -97,4 +102,3 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
...@@ -41,6 +41,7 @@ namespace zmq ...@@ -41,6 +41,7 @@ namespace zmq
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_); void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_); void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
......
...@@ -102,19 +102,19 @@ void zmq::zmq_engine_t::in_event () ...@@ -102,19 +102,19 @@ void zmq::zmq_engine_t::in_event ()
// Push the data to the decoder. // Push the data to the decoder.
size_t processed = decoder.process_buffer (inpos, insize); size_t processed = decoder.process_buffer (inpos, insize);
// Adjust the buffer.
inpos += processed;
insize -= processed;
// Stop polling for input if we got stuck. // Stop polling for input if we got stuck.
if (processed < insize) { if (processed < insize) {
// This may happen if queue limits are implemented or when // This may happen if queue limits are in effect or when
// init object reads all required information from the socket // init object reads all required information from the socket
// and rejects to read more data. // and rejects to read more data.
reset_pollin (handle); reset_pollin (handle);
} }
// Adjust the buffer.
inpos += processed;
insize -= processed;
// Flush all messages the decoder may have produced. // Flush all messages the decoder may have produced.
inout->flush (); inout->flush ();
...@@ -162,6 +162,13 @@ void zmq::zmq_engine_t::revive () ...@@ -162,6 +162,13 @@ void zmq::zmq_engine_t::revive ()
out_event (); out_event ();
} }
void zmq::zmq_engine_t::resume_input ()
{
set_pollin (handle);
in_event ();
}
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{ {
decoder.add_prefix (identity_); decoder.add_prefix (identity_);
......
...@@ -47,6 +47,7 @@ namespace zmq ...@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input ();
void add_prefix (const blob_t &identity_); void add_prefix (const blob_t &identity_);
void trim_prefix (); void trim_prefix ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment