Commit 4aa5ba3d authored by Ian Barber's avatar Ian Barber

Replace incomplete count with a std::set

This commit removes the countdown flag and adds a set to store the pipes
that are currently being disconnected.
parent 7b105865
...@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), connect (connect_),
pipe (NULL), pipe (NULL),
incomplete_detach (0),
incomplete_in (false), incomplete_in (false),
pending (false), pending (false),
engine (NULL), engine (NULL),
...@@ -231,31 +230,26 @@ void zmq::session_base_t::clean_pipes () ...@@ -231,31 +230,26 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_) void zmq::session_base_t::terminated (pipe_t *pipe_)
{ {
// Drop the reference to the deallocated pipe if required. // Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || incomplete_detach > 0); zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0);
// If we still have pipes outstanding, decrement. if (pipe == pipe_)
// This will only have been set in a disconnect situation // If this is our current pipe, remove it
// where delay_attach_on_connect is 1. pipe = NULL;
if (incomplete_detach > 0) else
incomplete_detach --; // Remove the pipe from the detached pipes set
incomplete_pipes.erase (pipe_);
// If there are still extra detached pipes, don't continue
if (incomplete_detach > 0)
return;
pipe = NULL;
// If we are waiting for pending messages to be sent, at this point // If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed // we are sure that there will be no more messages and we can proceed
// with termination safely. // with termination safely.
if (pending) if (pending && !pipe && incomplete_pipes.size () == 0)
proceed_with_term (); proceed_with_term ();
} }
void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::read_activated (pipe_t *pipe_)
{ {
// Skip activating if we're detaching this pipe // Skip activating if we're detaching this pipe
if (incomplete_detach > 0 && pipe_ != pipe) if (incomplete_pipes.size () > 0 && pipe_ != pipe)
return; return;
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
...@@ -269,7 +263,7 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) ...@@ -269,7 +263,7 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
void zmq::session_base_t::write_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_)
{ {
// Skip activating if we're detaching this pipe // Skip activating if we're detaching this pipe
if (incomplete_detach > 0 && pipe_ != pipe) if (incomplete_pipes.size () > 0 && pipe_ != pipe)
return; return;
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
...@@ -311,7 +305,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -311,7 +305,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (engine_ != NULL); zmq_assert (engine_ != NULL);
// Create the pipe if it does not exist yet. // Create the pipe if it does not exist yet.
if ((!pipe || incomplete_detach > 0) && !is_terminating ()) { if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL}; pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm}; int hwms [2] = {options.rcvhwm, options.sndhwm};
...@@ -421,7 +415,8 @@ void zmq::session_base_t::detached () ...@@ -421,7 +415,8 @@ void zmq::session_base_t::detached ()
&& addr->protocol != "pgm" && addr->protocol != "epgm") { && addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->hiccup (); pipe->hiccup ();
pipe->terminate (false); pipe->terminate (false);
incomplete_detach ++; incomplete_pipes.insert (pipe);
pipe = NULL;
} }
reset (); reset ();
......
...@@ -104,8 +104,8 @@ namespace zmq ...@@ -104,8 +104,8 @@ namespace zmq
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
// This flag is set if we are disconnecting, but haven't yet completed // This set is added to with pipes we are disconnecting, but haven't yet completed
int incomplete_detach; std::set<pipe_t *> incomplete_pipes;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
......
...@@ -33,8 +33,6 @@ static void *server (void *c) ...@@ -33,8 +33,6 @@ static void *server (void *c)
char buffer[16]; char buffer[16];
int rc, val; int rc, val;
shoulddie = *(long *)sd;
context = zmq_init (1); context = zmq_init (1);
assert (context); assert (context);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment