Commit 91fdedf2 authored by Martin Sustrik's avatar Martin Sustrik

Fix polling on XREP socket

When polling on XREP socket in incoming message part was prefetched,
but not the identity of sender. The problem is fixed by this patch.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent f9eb7632
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
prefetched (false), prefetched (0),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
...@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) ...@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{ {
// if there is a prefetched identity, return it.
if (prefetched == 2)
{
int rc = msg_->init_size (prefetched_id.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
msg_->set_flags (msg_t::more);
prefetched = 1;
return 0;
}
// If there is a prefetched message, return it. // If there is a prefetched message, return it.
if (prefetched) { if (prefetched == 1) {
int rc = msg_->move (prefetched_msg); int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0); errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false; prefetched = 0;
return 0; return 0;
} }
...@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// have to the prefetched and return the ID of the peer instead. // have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_); int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
prefetched = true; prefetched = 1;
rc = msg_->close (); rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void) ...@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_in ()
{ {
// If we are in the middle of reading the messages, there are
// definitely more parts available.
if (more_in)
return true;
// We may already have a message pre-fetched. // We may already have a message pre-fetched.
if (prefetched) if (prefetched > 0)
return true; return true;
// Try to read the next message to the pre-fetch buffer. // Try to read the next message to the pre-fetch buffer. If anything,
int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT); // it will be identity of the peer sending the message.
if (rc != 0 && errno == EAGAIN) msg_t id;
id.init ();
int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN) {
id.close ();
return false; return false;
}
zmq_assert (rc == 0); zmq_assert (rc == 0);
prefetched = true;
// We have first part of the message prefetched now. We will store the
// prefetched identity as well.
prefetched_id.assign ((unsigned char*) id.data (), id.size ());
id.close ();
prefetched = 2;
return true; return true;
} }
......
...@@ -67,8 +67,12 @@ namespace zmq ...@@ -67,8 +67,12 @@ namespace zmq
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
// Have we prefetched a message. // This value is either 0 (nothing is prefetched), 1 (only message body
bool prefetched; // is prefetched) or 2 (both identity and message body are prefetched).
int prefetched;
// Holds the prefetched identity.
blob_t prefetched_id;
// Holds the prefetched message. // Holds the prefetched message.
msg_t prefetched_msg; msg_t prefetched_msg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment