Commit cb35fd7b authored by Ian Barber's avatar Ian Barber

Attempt to fix disconnect not respecting linger

Looks like linger is honoured properly, but shutting down the session
causes the pipe termination to come from that side - because the local
pipe then shuts down right away it seems to trigger a terminated on the
other end instead of waiting. This way we trigger the termination from
the local end and then terminate the session.
parent ed272fe5
...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener); add_endpoint (addr_, (own_t *) listener, NULL);
return 0; return 0;
} }
...@@ -395,7 +395,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -395,7 +395,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener); add_endpoint (addr_, (own_t *) listener, NULL);
return 0; return 0;
} }
#endif #endif
...@@ -548,6 +548,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -548,6 +548,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM does not support subscription forwarding; ask for all data to be // PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. // sent to this pipe.
bool icanhasall = protocol == "pgm" || protocol == "epgm"; bool icanhasall = protocol == "pgm" || protocol == "epgm";
pipe_t *newpipe = NULL;
if (options.immediate != 1 || icanhasall) { if (options.immediate != 1 || icanhasall) {
// Create a bi-directional pipe. // Create a bi-directional pipe.
...@@ -560,6 +561,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -560,6 +561,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (new_pipes [0], icanhasall); attach_pipe (new_pipes [0], icanhasall);
newpipe = new_pipes [0];
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (new_pipes [1]); session->attach_pipe (new_pipes [1]);
...@@ -568,15 +570,15 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -568,15 +570,15 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
paddr->to_string (last_endpoint); paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session); add_endpoint (addr_, (own_t *) session, newpipe);
return 0; return 0;
} }
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_) void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{ {
// Activate the session. Make it a child of this socket. // Activate the session. Make it a child of this socket.
launch_child (endpoint_); launch_child (endpoint_);
endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)); endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t(endpoint_, pipe)));
} }
int zmq::socket_base_t::term_endpoint (const char *addr_) int zmq::socket_base_t::term_endpoint (const char *addr_)
...@@ -631,8 +633,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -631,8 +633,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return -1; return -1;
} }
for (endpoints_t::iterator it = range.first; it != range.second; ++it) for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
term_child (it->second); // If we have an associated pipe, terminate it.
if (it->second.second != NULL)
it->second.second->terminate(false);
term_child (it->second.first);
}
endpoints.erase (range.first, range.second); endpoints.erase (range.first, range.second);
return 0; return 0;
} }
......
...@@ -158,10 +158,11 @@ namespace zmq ...@@ -158,10 +158,11 @@ namespace zmq
private: private:
// Creates new endpoint ID and adds the endpoint to the map. // Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_); void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
// Map of open endpoints. // Map of open endpoints.
typedef std::multimap <std::string, own_t *> endpoints_t; typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
endpoints_t endpoints; endpoints_t endpoints;
// Map of open inproc endpoints. // Map of open inproc endpoints.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment