Commit f0316771 authored by Martin Sustrik's avatar Martin Sustrik

rollback of half-processed messages in case of disconnection

parent dfdaff5e
...@@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, ...@@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_) : const options_t &options_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
in_pipe (NULL), in_pipe (NULL),
incomplete_in (false),
active (true), active (true),
out_pipe (NULL), out_pipe (NULL),
engine (NULL), engine (NULL),
...@@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe || !active) if (!in_pipe || !active)
return false; return false;
return in_pipe->read (msg_); if (!in_pipe->read (msg_))
return false;
incomplete_in = msg_->flags & ZMQ_MSG_TBC;
return true;
} }
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
...@@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_) ...@@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_)
// Engine is terminating itself. No need to deallocate it from here. // Engine is terminating itself. No need to deallocate it from here.
engine = NULL; engine = NULL;
// Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream.
if (out_pipe) {
out_pipe->rollback ();
out_pipe->flush ();
}
// Remove any half-read message from the in pipe.
if (in_pipe) {
while (incomplete_in) {
zmq_msg_t msg;
zmq_msg_init (&msg);
if (!read (&msg)) {
zmq_assert (!incomplete_in);
break;
}
zmq_msg_close (&msg);
}
}
// Terminate transient session. // Terminate transient session.
if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term (); term ();
......
...@@ -72,6 +72,10 @@ namespace zmq ...@@ -72,6 +72,10 @@ namespace zmq
// Inbound pipe, i.e. one the session is getting messages from. // Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe; class reader_t *in_pipe;
// This flag is true if the remainder of the message being processed
// is still in the in pipe.
bool incomplete_in;
// If true, in_pipe is active. Otherwise there are no messages to get. // If true, in_pipe is active. Otherwise there are no messages to get.
bool active; bool active;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment