Commit ba1ae7d6 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #595 from hurtonm/master

Another ZMQ_STREAM simplification
parents fe2753da 7b27c125
...@@ -28,7 +28,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -28,7 +28,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
prefetched (false), prefetched (false),
identity_sent (false), identity_sent (false),
more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
next_peer_id (generate_random ()) next_peer_id (generate_random ())
...@@ -97,8 +96,6 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -97,8 +96,6 @@ int zmq::stream_t::xsend (msg_t *msg_)
// TODO: The connections should be killed instead. // TODO: The connections should be killed instead.
if (msg_->flags () & msg_t::more) { if (msg_->flags () & msg_t::more) {
more_out = true;
// Find the pipe associated with the identity stored in the prefix. // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe return an error // If there's no such pipe return an error
blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
...@@ -109,18 +106,19 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -109,18 +106,19 @@ int zmq::stream_t::xsend (msg_t *msg_)
if (!current_out->check_write ()) { if (!current_out->check_write ()) {
it->second.active = false; it->second.active = false;
current_out = NULL; current_out = NULL;
more_out = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
} }
else { else {
more_out = false;
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
return -1; return -1;
} }
} }
// Expect one more message frame.
more_out = true;
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init (); rc = msg_->init ();
...@@ -131,8 +129,8 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -131,8 +129,8 @@ int zmq::stream_t::xsend (msg_t *msg_)
// Ignore the MORE flag // Ignore the MORE flag
msg_->reset_flags (msg_t::more); msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message. // This is the last part of the message.
more_out = msg_->flags () & msg_t::more ? true : false; more_out = false;
// Push the message into the pipe. If there's no out pipe, just drop it. // Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) { if (current_out) {
...@@ -148,13 +146,9 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -148,13 +146,9 @@ int zmq::stream_t::xsend (msg_t *msg_)
return 0; return 0;
} }
bool ok = current_out->write (msg_); bool ok = current_out->write (msg_);
if (unlikely (!ok)) if (likely (ok))
current_out = NULL;
else
if (!more_out) {
current_out->flush (); current_out->flush ();
current_out = NULL; current_out = NULL;
}
} }
else { else {
int rc = msg_->close (); int rc = msg_->close ();
...@@ -181,46 +175,34 @@ int zmq::stream_t::xrecv (msg_t *msg_) ...@@ -181,46 +175,34 @@ int zmq::stream_t::xrecv (msg_t *msg_)
errno_assert (rc == 0); errno_assert (rc == 0);
prefetched = false; prefetched = false;
} }
more_in = msg_->flags () & msg_t::more ? true : false;
return 0; return 0;
} }
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe); int rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0) if (rc != 0)
return -1; return -1;
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
// If we are in the middle of reading a message, just return the next part. // We have received a frame with TCP data.
if (more_in) // Rather than sendig this frame, we keep it in prefetched
more_in = msg_->flags () & msg_t::more ? true : false; // buffer and send a frame with peer's ID.
else { blob_t identity = pipe->get_identity ();
// We are at the beginning of a message. rc = msg_->init_size (identity.size ());
// Keep the message part we have in the prefetch buffer errno_assert (rc == 0);
// and return the ID of the peer instead. memcpy (msg_->data (), identity.data (), identity.size ());
rc = prefetched_msg.move (*msg_); msg_->set_flags (msg_t::more);
errno_assert (rc == 0);
prefetched = true;
blob_t identity = pipe->get_identity (); prefetched = true;
rc = msg_->init_size (identity.size ()); identity_sent = true;
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
identity_sent = true;
}
return 0; return 0;
} }
bool zmq::stream_t::xhas_in () bool zmq::stream_t::xhas_in ()
{ {
// If we are in the middle of reading the messages, there are
// definitely more parts available.
if (more_in)
return true;
// We may already have a message pre-fetched. // We may already have a message pre-fetched.
if (prefetched) if (prefetched)
return true; return true;
...@@ -233,6 +215,7 @@ bool zmq::stream_t::xhas_in () ...@@ -233,6 +215,7 @@ bool zmq::stream_t::xhas_in ()
return false; return false;
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
blob_t identity = pipe->get_identity (); blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ()); rc = prefetched_id.init_size (identity.size ());
......
...@@ -68,9 +68,6 @@ namespace zmq ...@@ -68,9 +68,6 @@ namespace zmq
// Holds the prefetched message. // Holds the prefetched message.
msg_t prefetched_msg; msg_t prefetched_msg;
// If true, more incoming message parts are expected.
bool more_in;
struct outpipe_t struct outpipe_t
{ {
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment