Commit 718885fd authored by Martin Sustrik's avatar Martin Sustrik

Pending messages are delivered even if connection doesn't exist yet

Bug in previous refactoring fixed.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 87a6490b
...@@ -216,12 +216,13 @@ void zmq::pipe_t::process_pipe_term () ...@@ -216,12 +216,13 @@ void zmq::pipe_t::process_pipe_term ()
if (!delay) { if (!delay) {
state = terminating; state = terminating;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return;
} }
else { else {
state = pending; state = pending;
}
return; return;
} }
}
// Delimiter happened to arrive before the term command. Now we have the // Delimiter happened to arrive before the term command. Now we have the
// term command as well, so we can move straight to terminating state. // term command as well, so we can move straight to terminating state.
......
...@@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
pipe (NULL), pipe (NULL),
incomplete_in (false), incomplete_in (false),
terminating (false), pending (false),
engine (NULL), engine (NULL),
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
...@@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_) ...@@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_)
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
pipe = NULL; pipe = NULL;
if (terminating) // If we are waiting for pending messages to be sent, at this point
unregister_term_ack (); // we are sure that there will be no more messages and we can proceed
// with termination safely.
if (pending)
proceed_with_term ();
} }
void zmq::session_t::read_activated (pipe_t *pipe_) void zmq::session_t::read_activated (pipe_t *pipe_)
...@@ -150,15 +153,6 @@ void zmq::session_t::process_plug () ...@@ -150,15 +153,6 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_attach (i_engine *engine_, void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_) const blob_t &peer_identity_)
{ {
// If we are already terminating, we destroy the engine straight away.
// Note that we don't have to unplug it before deleting as it's not
// yet plugged to the session.
if (terminating) {
if (engine_)
delete engine_;
return;
}
// If some other object (e.g. init) notifies us that the connection failed // If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process. // without creating an engine we need to start the reconnection process.
if (!engine_) { if (!engine_) {
...@@ -217,37 +211,52 @@ void zmq::session_t::detach () ...@@ -217,37 +211,52 @@ void zmq::session_t::detach ()
void zmq::session_t::process_term (int linger_) void zmq::session_t::process_term (int linger_)
{ {
// If termination is already underway, do nothing. zmq_assert (!pending);
if (!terminating) {
terminating = true;
// If the termination of the pipe happens before the term command is // If the termination of the pipe happens before the term command is
// delivered there's nothing much to do. We can proceed with the // delivered there's nothing much to do. We can proceed with the
// stadard termination immediately. // stadard termination immediately.
if (pipe) { if (!pipe) {
proceed_with_term ();
// We're going to wait till the pipe terminates. return;
register_term_acks (1); }
// If linger is set to zero, we can ask pipe to terminate without // If linger is set to zero, we can ask pipe to terminate without
// waiting for pending messages to be read. // waiting for pending messages to be read.
if (linger_ == 0) if (linger_ == 0) {
pipe->terminate (); proceed_with_term ();
return;
}
pending = true;
// If there's finite linger value, set up a timer. // If there's finite linger value, delay the termination.
// If linger is infinite (negative) we don't even have to set
// the timer.
if (linger_ > 0) { if (linger_ > 0) {
zmq_assert (!has_linger_timer); zmq_assert (!has_linger_timer);
add_timer (linger_, linger_timer_id); add_timer (linger_, linger_timer_id);
has_linger_timer = true; has_linger_timer = true;
} }
// In case there's no engine and there's only delimiter in the pipe it // In case there's no engine and there's only delimiter in the
// wouldn't be ever read. Thus we check for it explicitly. // pipe it wouldn't be ever read. Thus we check for it explicitly.
pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::proceed_with_term ()
{
// The pending phase have just ended.
pending = false;
// If there's pipe attached to the session, we have to wait till it
// terminates.
if (pipe) {
register_term_acks (1);
pipe->terminate ();
} }
// Continue with standard termination.
own_t::process_term (0); own_t::process_term (0);
} }
...@@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_) ...@@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it. // Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe); zmq_assert (pipe);
pipe->terminate (); proceed_with_term ();
} }
bool zmq::session_t::has_engine () bool zmq::session_t::has_engine ()
...@@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_) ...@@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_)
socket->unregister_session (name_); socket->unregister_session (name_);
} }
void zmq::session_t::terminate ()
{
// If termination process is already underway, do nothing.
if (!terminating) {
terminating = true;
// If the pipe was already terminated, there's nothing much to do.
// If it wasn't, we'll ask it to terminate.
if (pipe) {
register_term_acks (1);
pipe->terminate ();
}
}
own_t::terminate ();
}
...@@ -59,10 +59,6 @@ namespace zmq ...@@ -59,10 +59,6 @@ namespace zmq
protected: protected:
// This function allows to shut down the session even though
// there are messages pending.
void terminate ();
// Two events for the derived session type. Attached is triggered // Two events for the derived session type. Attached is triggered
// when session is attached to a peer. The function can reject the new // when session is attached to a peer. The function can reject the new
// peer by returning false. Detached is triggered at the beginning of // peer by returning false. Detached is triggered at the beginning of
...@@ -105,9 +101,9 @@ namespace zmq ...@@ -105,9 +101,9 @@ namespace zmq
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
// If true the termination process is already underway, ie. term ack // True if termination have been suspended to push the pending
// for the pipe was already registered etc. // messages to the network.
bool terminating; bool pending;
// The protocol I/O engine connected to the session. // The protocol I/O engine connected to the session.
struct i_engine *engine; struct i_engine *engine;
......
...@@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_) ...@@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_)
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
// Orderly shutdown by the other peer. // Orderly shutdown by the peer.
if (nbytes == 0) if (nbytes == 0)
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment