Commit 668c42fc authored by Richard Newton's avatar Richard Newton

Fix hang on term when inproc is connected but never bound.

parent 6150812f
...@@ -108,6 +108,14 @@ int zmq::ctx_t::terminate () ...@@ -108,6 +108,14 @@ int zmq::ctx_t::terminate ()
term_mailbox.forked(); term_mailbox.forked();
} }
#endif #endif
// Connect up any pending inproc connections, otherwise we will hang
pending_connections_t copy = pending_connections;
for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
s->bind (p->first.c_str ());
s->close ();
}
// Check whether termination was already underway, but interrupted and now // Check whether termination was already underway, but interrupted and now
// restarted. // restarted.
bool restarted = terminating; bool restarted = terminating;
......
...@@ -36,7 +36,7 @@ static void pusher (void *ctx) ...@@ -36,7 +36,7 @@ static void pusher (void *ctx)
assert (rc == 0); assert (rc == 0);
} }
void test_bind_before_connect() void test_bind_before_connect ()
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -52,7 +52,7 @@ void test_bind_before_connect() ...@@ -52,7 +52,7 @@ void test_bind_before_connect()
assert (connectSocket); assert (connectSocket);
rc = zmq_connect (connectSocket, "inproc://a"); rc = zmq_connect (connectSocket, "inproc://a");
assert (rc == 0); assert (rc == 0);
// Queue up some data // Queue up some data
rc = zmq_send_const (connectSocket, "foobar", 6, 0); rc = zmq_send_const (connectSocket, "foobar", 6, 0);
assert (rc == 6); assert (rc == 6);
...@@ -77,7 +77,7 @@ void test_bind_before_connect() ...@@ -77,7 +77,7 @@ void test_bind_before_connect()
assert (rc == 0); assert (rc == 0);
} }
void test_connect_before_bind() void test_connect_before_bind ()
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -97,7 +97,7 @@ void test_connect_before_bind() ...@@ -97,7 +97,7 @@ void test_connect_before_bind()
assert (bindSocket); assert (bindSocket);
rc = zmq_bind (bindSocket, "inproc://a"); rc = zmq_bind (bindSocket, "inproc://a");
assert (rc == 0); assert (rc == 0);
// Read pending message // Read pending message
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
...@@ -118,7 +118,7 @@ void test_connect_before_bind() ...@@ -118,7 +118,7 @@ void test_connect_before_bind()
assert (rc == 0); assert (rc == 0);
} }
void test_connect_before_bind_pub_sub() void test_connect_before_bind_pub_sub ()
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -140,7 +140,7 @@ void test_connect_before_bind_pub_sub() ...@@ -140,7 +140,7 @@ void test_connect_before_bind_pub_sub()
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (bindSocket, "inproc://a"); rc = zmq_bind (bindSocket, "inproc://a");
assert (rc == 0); assert (rc == 0);
// Wait for pub-sub connection to happen // Wait for pub-sub connection to happen
msleep (SETTLE_TIME); msleep (SETTLE_TIME);
...@@ -168,14 +168,14 @@ void test_connect_before_bind_pub_sub() ...@@ -168,14 +168,14 @@ void test_connect_before_bind_pub_sub()
assert (rc == 0); assert (rc == 0);
} }
void test_multiple_connects() void test_multiple_connects ()
{ {
const unsigned int no_of_connects = 10; const unsigned int no_of_connects = 10;
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
int rc; int rc;
void *connectSocket[no_of_connects]; void *connectSocket [no_of_connects];
// Connect first // Connect first
for (unsigned int i = 0; i < no_of_connects; ++i) for (unsigned int i = 0; i < no_of_connects; ++i)
...@@ -195,7 +195,7 @@ void test_multiple_connects() ...@@ -195,7 +195,7 @@ void test_multiple_connects()
assert (bindSocket); assert (bindSocket);
rc = zmq_bind (bindSocket, "inproc://a"); rc = zmq_bind (bindSocket, "inproc://a");
assert (rc == 0); assert (rc == 0);
for (unsigned int i = 0; i < no_of_connects; ++i) for (unsigned int i = 0; i < no_of_connects; ++i)
{ {
// Read pending message // Read pending message
...@@ -222,7 +222,7 @@ void test_multiple_connects() ...@@ -222,7 +222,7 @@ void test_multiple_connects()
assert (rc == 0); assert (rc == 0);
} }
void test_multiple_threads() void test_multiple_threads ()
{ {
const unsigned int no_of_threads = 30; const unsigned int no_of_threads = 30;
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
...@@ -268,21 +268,21 @@ void test_multiple_threads() ...@@ -268,21 +268,21 @@ void test_multiple_threads()
assert (rc == 0); assert (rc == 0);
} }
void test_identity() void test_identity ()
{ {
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *sc = zmq_socket (ctx, ZMQ_DEALER); void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc); assert (sc);
int rc = zmq_connect (sc, "inproc://a"); int rc = zmq_connect (sc, "inproc://a");
assert (rc == 0); assert (rc == 0);
void *sb = zmq_socket (ctx, ZMQ_ROUTER); void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb); assert (sb);
rc = zmq_bind (sb, "inproc://a"); rc = zmq_bind (sb, "inproc://a");
assert (rc == 0); assert (rc == 0);
...@@ -316,17 +316,34 @@ void test_identity() ...@@ -316,17 +316,34 @@ void test_identity()
// Deallocate the infrastructure. // Deallocate the infrastructure.
rc = zmq_close (sc); rc = zmq_close (sc);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (sb); rc = zmq_close (sb);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_connect_only ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *connectSocket = zmq_socket (ctx, ZMQ_PUSH);
assert (connectSocket);
int rc = zmq_connect (connectSocket, "inproc://a");
assert (rc == 0);
rc = zmq_close (connectSocket);
assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
} }
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment ();
test_bind_before_connect (); test_bind_before_connect ();
test_connect_before_bind (); test_connect_before_bind ();
...@@ -334,6 +351,7 @@ int main (void) ...@@ -334,6 +351,7 @@ int main (void)
test_multiple_connects (); test_multiple_connects ();
test_multiple_threads (); test_multiple_threads ();
test_identity (); test_identity ();
test_connect_only ();
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment