Commit da1ef4d2 authored by Pieter Hintjens's avatar Pieter Hintjens Committed by Martin Sustrik

Fixed REP assert on missing envelope

Signed-off-by: 's avatarPieter Hintjens <ph@imatix.com>
parent 0c5b781e
......@@ -67,24 +67,38 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
if (request_begins) {
// Copy the backtrace stack to the reply pipe.
bool bottom = false;
while (!bottom) {
while (true) {
// TODO: What if request can be read but reply pipe is not
// ready for writing?
// TODO: If request can be read but reply pipe is not
// ready for writing, we should drop the reply.
// Get next part of the backtrace stack.
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
zmq_assert (msg_->flags () & msg_t::more);
// Empty message part delimits the traceback stack.
bottom = (msg_->size () == 0);
if (msg_->flags () & msg_t::more) {
// Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_);
zmq_assert (rc == 0);
// Empty message part delimits the traceback stack.
bool bottom = (msg_->size () == 0);
// Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_);
zmq_assert (rc == 0);
// The end of the traceback, move to processing message body.
if (bottom)
break;
}
else {
// If the traceback stack is malformed, discard anything
// already sent to pipe (we're at end of invalid message)
// and continue reading -- that'll switch us to the next pipe
// and next request.
rc = xrep_t::rollback ();
zmq_assert (rc == 0);
}
}
request_begins = false;
......
......@@ -61,7 +61,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
if (terminating) {
register_term_acks (1);
outpipe_->terminate ();
outpipe_->terminate ();
}
}
......@@ -289,6 +289,17 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
int zmq::xrep_t::rollback (void)
{
if (current_out) {
current_out->rollback ();
current_out = NULL;
more_out = false;
}
return 0;
}
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
......
......@@ -51,6 +51,11 @@ namespace zmq
bool xhas_in ();
bool xhas_out ();
protected:
// Rollback any message parts that were sent but not yet flushed.
int rollback ();
private:
// Hook into the termination process.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment