Commit fd707fed authored by Martin Hurton's avatar Martin Hurton

issue 38 - Assertion failed: fetched (xrep.cpp:196)

parent 805af824
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) : zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_), socket_base_t (parent_),
current_in (0), current_in (0),
prefetched (false),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false) more_out (false)
...@@ -189,6 +190,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -189,6 +190,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message. // Deallocate old content of the message.
zmq_msg_close (msg_); zmq_msg_close (msg_);
if (prefetched) {
zmq_msg_move (msg_, &prefetched_msg);
more_in = msg_->flags & ZMQ_MSG_MORE;
prefetched = false;
return 0;
}
// If we are in the middle of reading a message, just grab next part of it. // If we are in the middle of reading a message, just grab next part of it.
if (more_in) { if (more_in) {
zmq_assert (inpipes [current_in].active); zmq_assert (inpipes [current_in].active);
...@@ -207,21 +215,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -207,21 +215,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (int count = inpipes.size (); count != 0; count--) { for (int count = inpipes.size (); count != 0; count--) {
// Try to fetch new message. // Try to fetch new message.
bool fetched; if (inpipes [current_in].active)
if (!inpipes [current_in].active) prefetched = inpipes [current_in].reader->read (&prefetched_msg);
fetched = false;
else
fetched = inpipes [current_in].reader->check_read ();
// If we have a message, create a prefix and return it to the caller. // If we have a message, create a prefix and return it to the caller.
if (fetched) { if (prefetched) {
int rc = zmq_msg_init_size (msg_, int rc = zmq_msg_init_size (msg_,
inpipes [current_in].identity.size ()); inpipes [current_in].identity.size ());
zmq_assert (rc == 0); zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
zmq_msg_size (msg_)); zmq_msg_size (msg_));
msg_->flags = ZMQ_MSG_MORE; msg_->flags = ZMQ_MSG_MORE;
more_in = true;
return 0; return 0;
} }
...@@ -241,7 +245,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -241,7 +245,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_in ()
{ {
// There are subsequent parts of the partly-read message available. // There are subsequent parts of the partly-read message available.
if (more_in) if (prefetched || more_in)
return true; return true;
// Note that messing with current doesn't break the fairness of fair // Note that messing with current doesn't break the fairness of fair
......
...@@ -67,6 +67,12 @@ namespace zmq ...@@ -67,6 +67,12 @@ namespace zmq
// The pipe we are currently reading from. // The pipe we are currently reading from.
inpipes_t::size_type current_in; inpipes_t::size_type current_in;
// Have we prefetched a message.
bool prefetched;
// Holds the prefetched message.
zmq_msg_t prefetched_msg;
// If true, more incoming message parts are expected. // If true, more incoming message parts are expected.
bool more_in; bool more_in;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment