Commit b3ca7fd4 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #627 from ianbarber/master

Attempt to fix disconnect not respecting linger
parents 1011e8ad cb35fd7b
...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener); add_endpoint (addr_, (own_t *) listener, NULL);
return 0; return 0;
} }
...@@ -395,7 +395,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -395,7 +395,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener); add_endpoint (addr_, (own_t *) listener, NULL);
return 0; return 0;
} }
#endif #endif
...@@ -548,6 +548,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -548,6 +548,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM does not support subscription forwarding; ask for all data to be // PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. // sent to this pipe.
bool icanhasall = protocol == "pgm" || protocol == "epgm"; bool icanhasall = protocol == "pgm" || protocol == "epgm";
pipe_t *newpipe = NULL;
if (options.immediate != 1 || icanhasall) { if (options.immediate != 1 || icanhasall) {
// Create a bi-directional pipe. // Create a bi-directional pipe.
...@@ -560,6 +561,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -560,6 +561,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (new_pipes [0], icanhasall); attach_pipe (new_pipes [0], icanhasall);
newpipe = new_pipes [0];
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (new_pipes [1]); session->attach_pipe (new_pipes [1]);
...@@ -568,15 +570,15 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -568,15 +570,15 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI // Save last endpoint URI
paddr->to_string (last_endpoint); paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session); add_endpoint (addr_, (own_t *) session, newpipe);
return 0; return 0;
} }
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_) void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{ {
// Activate the session. Make it a child of this socket. // Activate the session. Make it a child of this socket.
launch_child (endpoint_); launch_child (endpoint_);
endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)); endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t(endpoint_, pipe)));
} }
int zmq::socket_base_t::term_endpoint (const char *addr_) int zmq::socket_base_t::term_endpoint (const char *addr_)
...@@ -631,8 +633,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -631,8 +633,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return -1; return -1;
} }
for (endpoints_t::iterator it = range.first; it != range.second; ++it) for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
term_child (it->second); // If we have an associated pipe, terminate it.
if (it->second.second != NULL)
it->second.second->terminate(false);
term_child (it->second.first);
}
endpoints.erase (range.first, range.second); endpoints.erase (range.first, range.second);
return 0; return 0;
} }
......
...@@ -158,10 +158,11 @@ namespace zmq ...@@ -158,10 +158,11 @@ namespace zmq
private: private:
// Creates new endpoint ID and adds the endpoint to the map. // Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_); void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
// Map of open endpoints. // Map of open endpoints.
typedef std::multimap <std::string, own_t *> endpoints_t; typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
endpoints_t endpoints; endpoints_t endpoints;
// Map of open inproc endpoints. // Map of open inproc endpoints.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment