Commit 00dc0245 authored by Martin Sustrik's avatar Martin Sustrik

Race condition in pipe_t fixed.

pipe_t now correctly drops pointer to the underlying pipe when
sending pipe_term_ack command.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 6052709c
...@@ -165,11 +165,13 @@ void zmq::pipe_t::rollback () ...@@ -165,11 +165,13 @@ void zmq::pipe_t::rollback ()
{ {
// Remove incomplete message from the outbound pipe. // Remove incomplete message from the outbound pipe.
msg_t msg; msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) { while (outpipe->unwrite (&msg)) {
zmq_assert (msg.flags () & msg_t::more); zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close (); int rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
}
} }
void zmq::pipe_t::flush () void zmq::pipe_t::flush ()
...@@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term () ...@@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term ()
if (state == active) { if (state == active) {
if (!delay) { if (!delay) {
state = terminating; state = terminating;
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return; return;
} }
...@@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term () ...@@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term ()
// term command as well, so we can move straight to terminating state. // term command as well, so we can move straight to terminating state.
if (state == delimited) { if (state == delimited) {
state = terminating; state = terminating;
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return; return;
} }
...@@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term () ...@@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term ()
// own ack. // own ack.
if (state == terminated) { if (state == terminated) {
state = double_terminated; state = double_terminated;
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return; return;
} }
...@@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack ()
// are invalid. // are invalid.
if (state == terminating) ; if (state == terminating) ;
else if (state == double_terminated); else if (state == double_terminated);
else if (state == terminated) else if (state == terminated) {
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
}
else else
zmq_assert (false); zmq_assert (false);
...@@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_)
// There are still pending messages available, but the user calls // There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read. // 'terminate'. We can act as if all the pending messages were read.
else if (state == pending && !delay) { else if (state == pending && !delay) {
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
state = terminating; state = terminating;
} }
...@@ -348,6 +356,8 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -348,6 +356,8 @@ void zmq::pipe_t::terminate (bool delay_)
// Stop outbound flow of messages. // Stop outbound flow of messages.
out_active = false; out_active = false;
if (outpipe) {
// Rollback any unfinished outbound messages. // Rollback any unfinished outbound messages.
rollback (); rollback ();
...@@ -357,6 +367,7 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -357,6 +367,7 @@ void zmq::pipe_t::terminate (bool delay_)
msg.init_delimiter (); msg.init_delimiter ();
outpipe->write (msg, false); outpipe->write (msg, false);
flush (); flush ();
}
} }
bool zmq::pipe_t::is_delimiter (msg_t &msg_) bool zmq::pipe_t::is_delimiter (msg_t &msg_)
...@@ -400,6 +411,7 @@ void zmq::pipe_t::delimit () ...@@ -400,6 +411,7 @@ void zmq::pipe_t::delimit ()
} }
if (state == pending) { if (state == pending) {
outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
state = terminating; state = terminating;
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment