Commit 4e4803e0 authored by Martin Hurton's avatar Martin Hurton

Rename pipe states to make it more mnemonic

parent f781eb7e
...@@ -96,7 +96,9 @@ zmq::blob_t zmq::pipe_t::get_identity () ...@@ -96,7 +96,9 @@ zmq::blob_t zmq::pipe_t::get_identity ()
bool zmq::pipe_t::check_read () bool zmq::pipe_t::check_read ()
{ {
if (unlikely (!in_active || (state != active && state != pending))) if (unlikely (!in_active))
return false;
if (unlikely (state != active && state != waiting_for_delimiter))
return false; return false;
// Check if there's an item in the pipe. // Check if there's an item in the pipe.
...@@ -120,7 +122,9 @@ bool zmq::pipe_t::check_read () ...@@ -120,7 +122,9 @@ bool zmq::pipe_t::check_read ()
bool zmq::pipe_t::read (msg_t *msg_) bool zmq::pipe_t::read (msg_t *msg_)
{ {
if (unlikely (!in_active || (state != active && state != pending))) if (unlikely (!in_active))
return false;
if (unlikely (state != active && state != waiting_for_delimiter))
return false; return false;
if (!inpipe->read (msg_)) { if (!inpipe->read (msg_)) {
...@@ -187,7 +191,7 @@ void zmq::pipe_t::rollback () ...@@ -187,7 +191,7 @@ void zmq::pipe_t::rollback ()
void zmq::pipe_t::flush () void zmq::pipe_t::flush ()
{ {
// The peer does not exist anymore at this point. // The peer does not exist anymore at this point.
if (state == terminating) if (state == term_ack_sent)
return; return;
if (outpipe && !outpipe->flush ()) if (outpipe && !outpipe->flush ())
...@@ -196,7 +200,7 @@ void zmq::pipe_t::flush () ...@@ -196,7 +200,7 @@ void zmq::pipe_t::flush ()
void zmq::pipe_t::process_activate_read () void zmq::pipe_t::process_activate_read ()
{ {
if (!in_active && (state == active || state == pending)) { if (!in_active && (state == active || state == waiting_for_delimiter)) {
in_active = true; in_active = true;
sink->read_activated (this); sink->read_activated (this);
} }
...@@ -240,24 +244,24 @@ void zmq::pipe_t::process_pipe_term () ...@@ -240,24 +244,24 @@ void zmq::pipe_t::process_pipe_term ()
{ {
// This is the simple case of peer-induced termination. If there are no // This is the simple case of peer-induced termination. If there are no
// more pending messages to read, or if the pipe was configured to drop // more pending messages to read, or if the pipe was configured to drop
// pending messages, we can move directly to the terminating state. // pending messages, we can move directly to the term_ack_sent state.
// Otherwise we'll hang up in pending state till all the pending messages // Otherwise we'll hang up in waiting_for_delimiter state till all
// are sent. // pending messages are read.
if (state == active) { if (state == active) {
if (!delay) { if (!delay) {
state = terminating; state = term_ack_sent;
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
} }
else else
state = pending; state = waiting_for_delimiter;
return; return;
} }
// Delimiter happened to arrive before the term command. Now we have the // Delimiter happened to arrive before the term command. Now we have the
// term command as well, so we can move straight to terminating state. // term command as well, so we can move straight to term_ack_sent state.
if (state == delimited) { if (state == delimiter_received) {
state = terminating; state = term_ack_sent;
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return; return;
...@@ -266,8 +270,8 @@ void zmq::pipe_t::process_pipe_term () ...@@ -266,8 +270,8 @@ void zmq::pipe_t::process_pipe_term ()
// This is the case where both ends of the pipe are closed in parallel. // This is the case where both ends of the pipe are closed in parallel.
// We simply reply to the request by ack and continue waiting for our // We simply reply to the request by ack and continue waiting for our
// own ack. // own ack.
if (state == terminated) { if (state == term_req_sent1) {
state = double_terminated; state = term_req_sent2;
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return; return;
...@@ -283,16 +287,16 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -283,16 +287,16 @@ void zmq::pipe_t::process_pipe_term_ack ()
zmq_assert (sink); zmq_assert (sink);
sink->terminated (this); sink->terminated (this);
// In terminating and double_terminated states there's nothing to do. // In term_ack_sent and term_req_sent2 states there's nothing to do.
// Simply deallocate the pipe. In terminated state we have to ack the // Simply deallocate the pipe. In term_req_sent1 state we have to ack
// peer before deallocating this side of the pipe. All the other states // the peer before deallocating this side of the pipe.
// are invalid. // All the other states are invalid.
if (state == terminated) { if (state == term_req_sent1) {
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
} }
else else
zmq_assert (state == terminating || state == double_terminated); zmq_assert (state == term_ack_sent || state == term_req_sent2);
// We'll deallocate the inbound pipe, the peer will deallocate the outbound // We'll deallocate the inbound pipe, the peer will deallocate the outbound
// pipe (which is an inbound pipe from its point of view). // pipe (which is an inbound pipe from its point of view).
...@@ -316,13 +320,13 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -316,13 +320,13 @@ void zmq::pipe_t::terminate (bool delay_)
delay = delay_; delay = delay_;
// If terminate was already called, we can ignore the duplicit invocation. // If terminate was already called, we can ignore the duplicit invocation.
if (state == terminated || state == double_terminated) if (state == term_req_sent1 || state == term_req_sent2)
return; return;
// If the pipe is in the final phase of async termination, it's going to // If the pipe is in the final phase of async termination, it's going to
// closed anyway. No need to do anything special here. // closed anyway. No need to do anything special here.
else else
if (state == terminating) if (state == term_ack_sent)
return; return;
// The simple sync termination case. Ask the peer to terminate and wait // The simple sync termination case. Ask the peer to terminate and wait
...@@ -330,30 +334,30 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -330,30 +334,30 @@ void zmq::pipe_t::terminate (bool delay_)
else else
if (state == active) { if (state == active) {
send_pipe_term (peer); send_pipe_term (peer);
state = terminated; state = term_req_sent1;
} }
// There are still pending messages available, but the user calls // There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read. // 'terminate'. We can act as if all the pending messages were read.
else else
if (state == pending && !delay) { if (state == waiting_for_delimiter && delay == 0) {
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
state = terminating; state = term_ack_sent;
} }
// If there are pending messages still availabe, do nothing. // If there are pending messages still availabe, do nothing.
else else
if (state == pending) { if (state == waiting_for_delimiter) {
} }
// We've already got delimiter, but not term command yet. We can ignore // We've already got delimiter, but not term command yet. We can ignore
// the delimiter and ack synchronously terminate as if we were in // the delimiter and ack synchronously terminate as if we were in
// active state. // active state.
else else
if (state == delimited) { if (state == delimiter_received) {
send_pipe_term (peer); send_pipe_term (peer);
state = terminated; state = term_req_sent1;
} }
// There are no other states. // There are no other states.
...@@ -413,14 +417,14 @@ int zmq::pipe_t::compute_lwm (int hwm_) ...@@ -413,14 +417,14 @@ int zmq::pipe_t::compute_lwm (int hwm_)
void zmq::pipe_t::delimit () void zmq::pipe_t::delimit ()
{ {
if (state == active) { if (state == active) {
state = delimited; state = delimiter_received;
return; return;
} }
if (state == pending) { if (state == waiting_for_delimiter) {
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
state = terminating; state = term_ack_sent;
return; return;
} }
......
...@@ -162,22 +162,24 @@ namespace zmq ...@@ -162,22 +162,24 @@ namespace zmq
// Sink to send events to. // Sink to send events to.
i_pipe_events *sink; i_pipe_events *sink;
// State of the pipe endpoint. Active is common state before any // States of the pipe endpoint:
// termination begins. Delimited means that delimiter was read from // active: common state before any termination begins,
// pipe before term command was received. Pending means that term // delimiter_received: delimiter was read from pipe before
// command was already received from the peer but there are still // term command was received,
// pending messages to read. Terminating means that all pending // waiting_fo_delimiter: term command was already received
// messages were already read and all we are waiting for is ack from // from the peer but there are still pending messages to read,
// the peer. Terminated means that 'terminate' was explicitly called // term_ack_sent: all pending messages were already read and
// by the user. Double_terminated means that user called 'terminate' // all we are waiting for is ack from the peer,
// and then we've got term command from the peer as well. // term_req_sent1: 'terminate' was explicitly called by the user,
// term_req_sent2: user called 'terminate' and then we've got
// term command from the peer as well.
enum { enum {
active, active,
delimited, delimiter_received,
pending, waiting_for_delimiter,
terminating, term_ack_sent,
terminated, term_req_sent1,
double_terminated term_req_sent2
} state; } state;
// If true, we receive all the pending inbound messages before // If true, we receive all the pending inbound messages before
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment