Commit 804bce82 authored by somdoron's avatar somdoron

Fix pipe terimation in router while reading message

parent 4f7dc496
...@@ -39,6 +39,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -39,6 +39,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
prefetched (false), prefetched (false),
identity_sent (false), identity_sent (false),
current_in (NULL),
terminate_current_in (false),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
...@@ -295,6 +297,14 @@ int zmq::router_t::xrecv (msg_t *msg_) ...@@ -295,6 +297,14 @@ int zmq::router_t::xrecv (msg_t *msg_)
prefetched = false; prefetched = false;
} }
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
if (!more_in) {
if (terminate_current_in) {
current_in->terminate (true);
terminate_current_in = false;
}
current_in = NULL;
}
return 0; return 0;
} }
...@@ -313,8 +323,17 @@ int zmq::router_t::xrecv (msg_t *msg_) ...@@ -313,8 +323,17 @@ int zmq::router_t::xrecv (msg_t *msg_)
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
if (more_in) if (more_in) {
more_in = msg_->flags () & msg_t::more ? true : false; more_in = msg_->flags () & msg_t::more ? true : false;
if (!more_in) {
if (terminate_current_in) {
current_in->terminate (true);
terminate_current_in = false;
}
current_in = NULL;
}
}
else { else {
// We are at the beginning of a message. // We are at the beginning of a message.
// Keep the message part we have in the prefetch buffer // Keep the message part we have in the prefetch buffer
...@@ -322,6 +341,7 @@ int zmq::router_t::xrecv (msg_t *msg_) ...@@ -322,6 +341,7 @@ int zmq::router_t::xrecv (msg_t *msg_)
rc = prefetched_msg.move (*msg_); rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
prefetched = true; prefetched = true;
current_in = pipe;
blob_t identity = pipe->get_identity (); blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ()); rc = msg_->init_size (identity.size ());
...@@ -382,6 +402,7 @@ bool zmq::router_t::xhas_in () ...@@ -382,6 +402,7 @@ bool zmq::router_t::xhas_in ()
prefetched = true; prefetched = true;
identity_sent = false; identity_sent = false;
current_in = pipe;
return true; return true;
} }
...@@ -466,6 +487,9 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -466,6 +487,9 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
// connection to take the identity. // connection to take the identity.
outpipes.erase (it); outpipes.erase (it);
if (existing_outpipe.pipe == current_in)
terminate_current_in = true;
else
existing_outpipe.pipe->terminate (true); existing_outpipe.pipe->terminate (true);
} }
} }
......
...@@ -92,6 +92,12 @@ namespace zmq ...@@ -92,6 +92,12 @@ namespace zmq
// Holds the prefetched message. // Holds the prefetched message.
msg_t prefetched_msg; msg_t prefetched_msg;
// The pipe we are currently reading from
zmq::pipe_t *current_in;
// Should current_in should be terminate after all parts received?
bool terminate_current_in;
// If true, more incoming message parts are expected. // If true, more incoming message parts are expected.
bool more_in; bool more_in;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment