Commit 87a6490b authored by Martin Sustrik's avatar Martin Sustrik

All pipe termination code moved to pipe_t

Till now the code was spread over mutliple locations.
Additionally, the code was made more formally correct,
with explicit pipe state machine etc.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 3d4203de
...@@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peers_msgs_read (0), peers_msgs_read (0),
peer (NULL), peer (NULL),
sink (NULL), sink (NULL),
terminating (false), state (active),
term_recvd (false),
delimited (false),
delay (delay_) delay (delay_)
{ {
} }
...@@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) ...@@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
bool zmq::pipe_t::check_read () bool zmq::pipe_t::check_read ()
{ {
if (unlikely (!in_active)) if (unlikely (!in_active || (state != active && state != pending)))
return false; return false;
// Check if there's an item in the pipe. // Check if there's an item in the pipe.
...@@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read () ...@@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read ()
msg_t msg; msg_t msg;
bool ok = inpipe->read (&msg); bool ok = inpipe->read (&msg);
zmq_assert (ok); zmq_assert (ok);
delimited = true; delimit ();
// If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now.
if (term_recvd)
send_pipe_term_ack (peer);
return false; return false;
} }
...@@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read () ...@@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read ()
bool zmq::pipe_t::read (msg_t *msg_) bool zmq::pipe_t::read (msg_t *msg_)
{ {
if (unlikely (!in_active)) if (unlikely (!in_active || (state != active && state != pending)))
return false; return false;
if (!inpipe->read (msg_)) { if (!inpipe->read (msg_)) {
...@@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_) ...@@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
// If delimiter was read, start termination process of the pipe. // If delimiter was read, start termination process of the pipe.
if (msg_->is_delimiter ()) { if (msg_->is_delimiter ()) {
delimited = true; delimit ();
// If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now.
if (term_recvd)
send_pipe_term_ack (peer);
return false; return false;
} }
...@@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_) ...@@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
bool zmq::pipe_t::check_write (msg_t *msg_) bool zmq::pipe_t::check_write (msg_t *msg_)
{ {
if (unlikely (!out_active)) if (unlikely (!out_active || state != active))
return false; return false;
bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
...@@ -188,13 +174,21 @@ void zmq::pipe_t::rollback () ...@@ -188,13 +174,21 @@ void zmq::pipe_t::rollback ()
void zmq::pipe_t::flush () void zmq::pipe_t::flush ()
{ {
// If terminate() was already called do nothing.
if (state == terminated && state == double_terminated)
return;
// The peer does not exist anymore at this point.
if (state == terminating)
return;
if (!outpipe->flush ()) if (!outpipe->flush ())
send_activate_read (peer); send_activate_read (peer);
} }
void zmq::pipe_t::process_activate_read () void zmq::pipe_t::process_activate_read ()
{ {
if (!in_active && !terminating) { if (!in_active && (state == active || state == pending)) {
in_active = true; in_active = true;
sink->read_activated (this); sink->read_activated (this);
} }
...@@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) ...@@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
// Remember the peers's message sequence number. // Remember the peers's message sequence number.
peers_msgs_read = msgs_read_; peers_msgs_read = msgs_read_;
if (!out_active && !terminating) { if (!out_active && state == active) {
out_active = true; out_active = true;
sink->write_activated (this); sink->write_activated (this);
} }
...@@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) ...@@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
void zmq::pipe_t::process_pipe_term () void zmq::pipe_t::process_pipe_term ()
{ {
term_recvd = true; // This is the simple case of peer-induced termination. If there are no
// more pending messages to read, or if the pipe was configured to drop
// We can proceed with the termination if one of the following is true: // pending messages, we can move directly to the terminating state.
// 1. User asked this side of pipe to terminate already. // Otherwise we'll hang up in pending state till all the pending messages
// 2. Waiting for pending messages in not required. // are sent.
// 3. Delimiter was already received. if (state == active) {
if (terminating || !delay || delimited) { if (!delay) {
terminating = true; state = terminating;
send_pipe_term_ack (peer);
}
else {
state = pending;
}
return;
}
// Delimiter happened to arrive before the term command. Now we have the
// term command as well, so we can move straight to terminating state.
if (state == delimited) {
state = terminating;
send_pipe_term_ack (peer);
return;
}
// This is the case where both ends of the pipe are closed in parallel.
// We simply reply to the request by ack and continue waiting for our
// own ack.
if (state == terminated) {
state = double_terminated;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return;
} }
// pipe_term is invalid in other states.
zmq_assert (false);
} }
void zmq::pipe_t::process_pipe_term_ack () void zmq::pipe_t::process_pipe_term_ack ()
...@@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack ()
zmq_assert (sink); zmq_assert (sink);
sink->terminated (this); sink->terminated (this);
// If the peer haven't asked for the termination itself, we have to // In terminating and double_terminated states there's nothing to do.
// ack the ack, so that it can deallocate properly. // Simply deallocate the pipe. In terminated state we have to ack the
if (!term_recvd) // peer before deallocating this side of the pipe. All the other states
// are invalid.
if (state == terminating) ;
else if (state == double_terminated);
else if (state == terminated)
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
else
zmq_assert (false);
// We'll deallocate the inbound pipe, the peer will deallocate the outbound // We'll deallocate the inbound pipe, the peer will deallocate the outbound
// pipe (which is an inbound pipe from its point of view). // pipe (which is an inbound pipe from its point of view).
...@@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack ()
void zmq::pipe_t::terminate () void zmq::pipe_t::terminate ()
{ {
// Prevent double termination. // If terminate was already called, we can ignore the duplicit invocation.
if (terminating) if (state == terminated || state == double_terminated)
return; return;
terminating = true;
// If the pipe is in the final phase of async termination, it's going to
// closed anyway. No need to do anything special here.
else if (state == terminating)
return;
// The simple sync termination case. Ask the peer to terminate and wait
// for the ack.
else if (state == active) {
send_pipe_term (peer);
state = terminated;
}
// There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read.
else if (state == pending) {
send_pipe_term_ack (peer);
state = terminating;
}
// We've already got delimiter, but not term command yet. We can ignore
// the delimiter and ack synchronously terminate as if we were in
// active state.
else if (state == delimited) {
send_pipe_term (peer);
state = terminated;
}
// There are no other states.
else
zmq_assert (false);
// Stop inbound and outbound flow of messages. // Stop inbound and outbound flow of messages.
in_active = false; in_active = false;
...@@ -272,9 +327,6 @@ void zmq::pipe_t::terminate () ...@@ -272,9 +327,6 @@ void zmq::pipe_t::terminate ()
msg.init_delimiter (); msg.init_delimiter ();
outpipe->write (msg, false); outpipe->write (msg, false);
flush (); flush ();
// Start the termination handshaking.
send_pipe_term (peer);
} }
bool zmq::pipe_t::is_delimiter (msg_t &msg_) bool zmq::pipe_t::is_delimiter (msg_t &msg_)
...@@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_) ...@@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_)
return result; return result;
} }
void zmq::pipe_t::delimit ()
{
if (state == active) {
state = delimited;
return;
}
if (state == pending) {
send_pipe_term_ack (peer);
state = terminating;
return;
}
// Delimiter in any other state is invalid.
zmq_assert (false);
}
...@@ -99,6 +99,9 @@ namespace zmq ...@@ -99,6 +99,9 @@ namespace zmq
void process_pipe_term (); void process_pipe_term ();
void process_pipe_term_ack (); void process_pipe_term_ack ();
// Handler for delimiter read from the pipe.
void delimit ();
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
...@@ -142,15 +145,23 @@ namespace zmq ...@@ -142,15 +145,23 @@ namespace zmq
// Sink to send events to. // Sink to send events to.
i_pipe_events *sink; i_pipe_events *sink;
// True is 'terminate' method was called or termination request // State of the pipe endpoint. Active is common state before any
// was received from the peer. // termination begins. Delimited means that delimiter was read from
bool terminating; // pipe before term command was received. Pending means that term
// command was already received from the peer but there are still
// True is we've already got pipe_term command from the peer. // pending messages to read. Terminating means that all pending
bool term_recvd; // messages were already read and all we are waiting for is ack from
// the peer. Terminated means that 'terminate' was explicitly called
// True if delimiter was already received from the peer. // by the user. Double_terminated means that user called 'terminate'
bool delimited; // and then we've got term command from the peer as well.
enum {
active,
delimited,
pending,
terminating,
terminated,
double_terminated
} state;
// If true, we receive all the pending inbound messages before // If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer // terminating. If false, we terminate immediately when the peer
......
...@@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
pipe (NULL), pipe (NULL),
incomplete_in (false), incomplete_in (false),
terminating (false),
engine (NULL), engine (NULL),
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
pipe_attached (false), has_linger_timer (false)
delimiter_processed (false), {
force_terminate (false),
has_linger_timer (false),
state (active)
{
} }
zmq::session_t::~session_t () zmq::session_t::~session_t ()
{ {
zmq_assert (!pipe); zmq_assert (!pipe);
if (engine)
engine->terminate ();
}
void zmq::session_t::proceed_with_term ()
{
if (state == terminating)
return;
zmq_assert (state == pending);
state = terminating;
// If there's still a pending linger timer, remove it. // If there's still a pending linger timer, remove it.
if (has_linger_timer) { if (has_linger_timer) {
cancel_timer (linger_timer_id); cancel_timer (linger_timer_id);
has_linger_timer = false; has_linger_timer = false;
} }
if (pipe) { // Close the engine.
register_term_acks (1); if (engine)
pipe->terminate (); engine->terminate ();
} }
// The session has already waited for the linger period. We don't want void zmq::session_t::attach_pipe (pipe_t *pipe_)
// the child objects to linger any more thus linger is set to zero. {
own_t::process_term (0); zmq_assert (!pipe);
zmq_assert (pipe_);
pipe = pipe_;
pipe->set_event_sink (this);
} }
bool zmq::session_t::read (msg_t *msg_) bool zmq::session_t::read (msg_t *msg_)
...@@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes () ...@@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes ()
} }
} }
void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (!pipe_attached);
pipe_attached = true;
if (pipe_) {
zmq_assert (!pipe);
pipe = pipe_;
pipe->set_event_sink (this);
}
// If we are already terminating, terminate the pipes straight away.
if (state == terminating) {
if (pipe) {
pipe->terminate ();
register_term_acks (1);
}
}
}
void zmq::session_t::terminated (pipe_t *pipe_) void zmq::session_t::terminated (pipe_t *pipe_)
{ {
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
// If we are in process of being closed, but still waiting for all
// pending messeges being sent, we can terminate here.
if (state == pending)
proceed_with_term ();
pipe = NULL; pipe = NULL;
if (state == terminating)
if (terminating)
unregister_term_ack (); unregister_term_ack ();
} }
...@@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
// If we are already terminating, we destroy the engine straight away. // If we are already terminating, we destroy the engine straight away.
// Note that we don't have to unplug it before deleting as it's not // Note that we don't have to unplug it before deleting as it's not
// yet plugged to the session. // yet plugged to the session.
if (state == terminating) { if (terminating) {
if (engine_) if (engine_)
delete engine_; delete engine_;
return; return;
...@@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_,
return; return;
} }
// Check whether the required pipe already exists and create it // Create the pipe if it does not exist yet.
// if it does not. if (!pipe) {
if (!pipe_attached) {
zmq_assert (!pipe);
pipe_attached = true;
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL}; pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm}; int hwms [2] = {options.rcvhwm, options.sndhwm};
...@@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
pipes [0]->set_event_sink (this); pipes [0]->set_event_sink (this);
// Remember the local end of the pipe. // Remember the local end of the pipe.
zmq_assert (!pipe);
pipe = pipes [0]; pipe = pipes [0];
// Ask socket to plug into the remote end of the pipe. // Ask socket to plug into the remote end of the pipe.
...@@ -249,43 +210,45 @@ void zmq::session_t::detach () ...@@ -249,43 +210,45 @@ void zmq::session_t::detach ()
// Send the event to the derived class. // Send the event to the derived class.
detached (); detached ();
// Just in case there's only a delimiter in the inbound pipe. // Just in case there's only a delimiter in the pipe.
if (pipe) if (pipe)
pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::process_term (int linger_) void zmq::session_t::process_term (int linger_)
{ {
zmq_assert (state == active); // If termination is already underway, do nothing.
state = pending; if (!terminating) {
// If linger is set to zero, we can terminate the session straight away terminating = true;
// not waiting for the pending messages to be sent.
if (linger_ == 0) { // If the termination of the pipe happens before the term command is
proceed_with_term (); // delivered there's nothing much to do. We can proceed with the
return; // stadard termination immediately.
if (pipe) {
// We're going to wait till the pipe terminates.
register_term_acks (1);
// If linger is set to zero, we can ask pipe to terminate without
// waiting for pending messages to be read.
if (linger_ == 0)
pipe->terminate ();
// If there's finite linger value, set up a timer.
if (linger_ > 0) {
zmq_assert (!has_linger_timer);
add_timer (linger_, linger_timer_id);
has_linger_timer = true;
}
// In case there's no engine and there's only delimiter in the pipe it
// wouldn't be ever read. Thus we check for it explicitly.
pipe->check_read ();
}
} }
// If there's finite linger value, set up a timer. own_t::process_term (0);
if (linger_ > 0) {
zmq_assert (!has_linger_timer);
add_timer (linger_, linger_timer_id);
has_linger_timer = true;
}
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (pipe)
pipe->check_read ();
// If there's no in pipe, there are no pending messages to send.
// We can proceed with the shutdown straight away. Also, if there is
// pipe, but the delimiter was already processed, we can terminate
// immediately. Alternatively, if the derived session type have
// called 'terminate' we'll finish straight away.
if (delimiter_processed || force_terminate ||
(!options.immediate_connect && !pipe))
proceed_with_term ();
} }
void zmq::session_t::timer_event (int id_) void zmq::session_t::timer_event (int id_)
...@@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_) ...@@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_)
// there are still pending messages to be sent. // there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id); zmq_assert (id_ == linger_timer_id);
has_linger_timer = false; has_linger_timer = false;
proceed_with_term ();
// Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe);
pipe->terminate ();
} }
bool zmq::session_t::has_engine () bool zmq::session_t::has_engine ()
...@@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_) ...@@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_)
void zmq::session_t::terminate () void zmq::session_t::terminate ()
{ {
force_terminate = true; // If termination process is already underway, do nothing.
own_t::terminate (); if (!terminating) {
terminating = true;
// If the pipe was already terminated, there's nothing much to do.
// If it wasn't, we'll ask it to terminate.
if (pipe) {
register_term_acks (1);
pipe->terminate ();
}
}
own_t::terminate ();
} }
...@@ -41,6 +41,9 @@ namespace zmq ...@@ -41,6 +41,9 @@ namespace zmq
session_t (class io_thread_t *io_thread_, session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_); class socket_base_t *socket_, const options_t &options_);
// To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_);
// i_inout interface implementation. Note that detach method is not // i_inout interface implementation. Note that detach method is not
// implemented by generic session. Different session types may handle // implemented by generic session. Different session types may handle
// engine disconnection in different ways. // engine disconnection in different ways.
...@@ -49,8 +52,6 @@ namespace zmq ...@@ -49,8 +52,6 @@ namespace zmq
void flush (); void flush ();
void detach (); void detach ();
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (class pipe_t *pipe_); void read_activated (class pipe_t *pipe_);
void write_activated (class pipe_t *pipe_); void write_activated (class pipe_t *pipe_);
...@@ -59,7 +60,7 @@ namespace zmq ...@@ -59,7 +60,7 @@ namespace zmq
protected: protected:
// This function allows to shut down the session even though // This function allows to shut down the session even though
// there are pending messages in the inbound pipe. // there are messages pending.
void terminate (); void terminate ();
// Two events for the derived session type. Attached is triggered // Two events for the derived session type. Attached is triggered
...@@ -104,6 +105,10 @@ namespace zmq ...@@ -104,6 +105,10 @@ namespace zmq
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
// If true the termination process is already underway, ie. term ack
// for the pipe was already registered etc.
bool terminating;
// The protocol I/O engine connected to the session. // The protocol I/O engine connected to the session.
struct i_engine *engine; struct i_engine *engine;
...@@ -114,28 +119,12 @@ namespace zmq ...@@ -114,28 +119,12 @@ namespace zmq
// the engines into the same thread. // the engines into the same thread.
class io_thread_t *io_thread; class io_thread_t *io_thread;
// If true, pipe was already attached to this session.
bool pipe_attached;
// If true, delimiter was already read from the inbound pipe.
bool delimiter_processed;
// If true, we should terminate the session even though there are
// pending messages in the inbound pipe.
bool force_terminate;
// ID of the linger timer // ID of the linger timer
enum {linger_timer_id = 0x20}; enum {linger_timer_id = 0x20};
// True is linger timer is running. // True is linger timer is running.
bool has_linger_timer; bool has_linger_timer;
enum {
active,
pending,
terminating
} state;
session_t (const session_t&); session_t (const session_t&);
const session_t &operator = (const session_t&); const session_t &operator = (const session_t&);
}; };
......
...@@ -450,8 +450,8 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -450,8 +450,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], blob_t ()); attach_pipe (pipes [0], blob_t ());
// Attach remote end of the pipe to the session object. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1], blob_t ()); session->attach_pipe (pipes [1]);
} }
// Activate the session. Make it a child of this socket. // Activate the session. Make it a child of this socket.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment