Commit ebd1bf9b authored by Martin Hurton's avatar Martin Hurton

Merge pull request #482 from sradomski/master

Close pipes for inproc sockets on zmq_disconnect
parents b2f6741b 21345ffa
......@@ -66,6 +66,7 @@ Pieter Hintjens <ph@imatix.com>
Piotr Trojanek <piotr.trojanek@gmail.com>
Robert G. Jakabosky <bobby@sharedrealm.com>
Sebastian Otaegui <feniix@gmail.com>
Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de>
Steven McCoy <steven.mccoy@miru.hk>
Stuart Webster <sw_webster@hotmail.com>
Tamara Kustarova <kustarova.tamara@gmail.com>
......
......@@ -34,6 +34,8 @@ The endpoint supplied is invalid.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
*ENOENT*::
The provided endpoint is not connected.
EXAMPLE
......
......@@ -478,6 +478,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI
options.last_endpoint.assign (addr_);
// remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[0]));
return 0;
}
......@@ -584,10 +587,37 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
if (unlikely (rc != 0))
return -1;
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
return -1;
// Disconnect an inproc socket
if (protocol == "inproc") {
std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
if (range.first == range.second) {
errno = ENOENT;
return -1;
}
for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate(true);
inprocs.erase (range.first, range.second);
return 0;
}
// Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second)
if (range.first == range.second) {
errno = ENOENT;
return -1;
}
for (endpoints_t::iterator it = range.first; it != range.second; ++it)
term_child (it->second);
......@@ -980,6 +1010,14 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
// Notify the specific socket type about the pipe termination.
xterminated (pipe_);
// Remove pipe from inproc pipes
for (inprocs_t::iterator it = inprocs.begin(); it != inprocs.end(); ++it) {
if (it->second == pipe_) {
inprocs.erase(it);
break;
}
}
// Remove the pipe from the list of attached pipes and confirm its
// termination if we are already shutting down.
pipes.erase (pipe_);
......
......@@ -170,6 +170,10 @@ namespace zmq
typedef std::multimap <std::string, own_t *> endpoints_t;
endpoints_t endpoints;
// Map of open inproc endpoints.
typedef std::multimap <std::string, pipe_t *> inprocs_t;
inprocs_t inprocs;
// To be called after processing commands or invoking any command
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
......
......@@ -18,7 +18,9 @@ noinst_PROGRAMS = test_pair_inproc \
test_term_endpoint \
test_monitor \
test_router_mandatory \
test_raw_sock
test_raw_sock \
test_disconnect_inproc
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
......@@ -43,6 +45,7 @@ test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
test_router_mandatory_SOURCES = test_router_mandatory.cpp
test_raw_sock_SOURCES = test_raw_sock.cpp
test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
#include <zmq.h>
#include <inttypes.h>
#include <string.h>
#include <assert.h>
/// Initialize a zeromq message with a given null-terminated string
#define ZMQ_PREPARE_STRING(msg, data, size) \
zmq_msg_init(&msg) && printf("zmq_msg_init: %s\n", zmq_strerror(errno)); \
zmq_msg_init_size (&msg, size + 1) && printf("zmq_msg_init_size: %s\n",zmq_strerror(errno)); \
memcpy(zmq_msg_data(&msg), data, size + 1);
int publicationsReceived = 0;
bool isSubscribed = false;
int main(int argc, char** argv) {
void* context = zmq_ctx_new();
void* pubSocket;
void* subSocket;
(pubSocket = zmq_socket(context, ZMQ_XPUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno));
(subSocket = zmq_socket(context, ZMQ_SUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno));
zmq_setsockopt(subSocket, ZMQ_SUBSCRIBE, "foo", 3) && printf("zmq_setsockopt: %s\n",zmq_strerror(errno));
zmq_bind(pubSocket, "inproc://someInProcDescriptor") && printf("zmq_bind: %s\n", zmq_strerror(errno));
//zmq_bind(pubSocket, "tcp://*:30010") && printf("zmq_bind: %s\n", zmq_strerror(errno));
int32_t more;
size_t more_size = sizeof(more);
int iteration = 0;
while(1) {
zmq_pollitem_t items [] = {
{ subSocket, 0, ZMQ_POLLIN, 0 }, // read publications
{ pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions
};
zmq_poll(items, 2, 500);
if (items[1].revents & ZMQ_POLLIN) {
while (1) {
zmq_msg_t msg;
zmq_msg_init (&msg);
zmq_msg_recv (&msg, pubSocket, 0);
int msgSize = zmq_msg_size(&msg);
char* buffer = (char*)zmq_msg_data(&msg);
if (buffer[0] == 0) {
assert(isSubscribed);
printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1));
isSubscribed = false;
} else {
assert(!isSubscribed);
printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1));
isSubscribed = true;
}
zmq_getsockopt (pubSocket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&msg);
if (!more)
break; // Last message part
}
}
if (items[0].revents & ZMQ_POLLIN) {
while (1) {
zmq_msg_t msg;
zmq_msg_init (&msg);
zmq_msg_recv (&msg, subSocket, 0);
int msgSize = zmq_msg_size(&msg);
char* buffer = (char*)zmq_msg_data(&msg);
printf("received on subscriber '%s'\n", strndup(buffer, msgSize));
zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&msg);
if (!more) {
publicationsReceived++;
break; // Last message part
}
}
}
if (iteration == 1) {
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno));
}
if (iteration == 4) {
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
}
if (iteration == 10) {
break;
}
zmq_msg_t channelEnvlp;
ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3);
zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
zmq_msg_t message;
ZMQ_PREPARE_STRING(message, "this is foo!", 12);
zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno));
zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
iteration++;
}
assert(publicationsReceived == 3);
assert(!isSubscribed);
zmq_close(pubSocket) && printf("zmq_close: %s", zmq_strerror(errno));
zmq_close(subSocket) && printf("zmq_close: %s", zmq_strerror(errno));
zmq_ctx_destroy(context);
return 0;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment