Commit 5cf74db6 authored by Joe Eli McIlvain's avatar Joe Eli McIlvain

Add failing test reproducing issue #1015.

There is a race condition when connect and bind on a new inproc
endpoint happen "simultaneously" in threads.  Causes the error:
  Assertion failed: ok (ctx.cpp:474)
parent 7ec7f703
...@@ -36,6 +36,40 @@ static void pusher (void *ctx) ...@@ -36,6 +36,40 @@ static void pusher (void *ctx)
assert (rc == 0); assert (rc == 0);
} }
static void simult_conn (void *payload)
{
// Pull out arguments - context followed by endpoint string
void* ctx = (void*)((void**)payload)[0];
char* endpt = (char*)((void**)payload)[1];
// Connect
void *connectSocket = zmq_socket (ctx, ZMQ_SUB);
assert (connectSocket);
int rc = zmq_connect (connectSocket, endpt);
assert (rc == 0);
// Cleanup
rc = zmq_close (connectSocket);
assert (rc == 0);
}
static void simult_bind (void *payload)
{
// Pull out arguments - context followed by endpoint string
void* ctx = (void*)((void**)payload)[0];
char* endpt = (char*)((void**)payload)[1];
// Bind
void *bindSocket = zmq_socket (ctx, ZMQ_PUB);
assert (bindSocket);
int rc = zmq_bind (bindSocket, endpt);
assert (rc == 0);
// Cleanup
rc = zmq_close (bindSocket);
assert (rc == 0);
}
void test_bind_before_connect () void test_bind_before_connect ()
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
...@@ -268,6 +302,42 @@ void test_multiple_threads () ...@@ -268,6 +302,42 @@ void test_multiple_threads ()
assert (rc == 0); assert (rc == 0);
} }
void test_simultaneous_connect_bind_threads ()
{
const unsigned int no_of_times = 50;
void *ctx = zmq_ctx_new ();
assert (ctx);
void *threads[no_of_times*2];
void *thr_args[no_of_times][2];
char endpts[no_of_times][20];
// Set up thread arguments: context followed by endpoint string
for (unsigned int i = 0; i < no_of_times; ++i)
{
thr_args[i][0] = (void*) ctx;
thr_args[i][1] = (void*) endpts[i];
sprintf (endpts[i], "inproc://foo_%d", i);
}
// Spawn all threads as simultaneously as possible
for (unsigned int i = 0; i < no_of_times; ++i)
{
threads[i*2+0] = zmq_threadstart (&simult_conn, (void*)thr_args[i]);
threads[i*2+1] = zmq_threadstart (&simult_bind, (void*)thr_args[i]);
}
// Close all threads
for (unsigned int i = 0; i < no_of_times; ++i)
{
zmq_threadclose (threads[i*2+0]);
zmq_threadclose (threads[i*2+1]);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_identity () void test_identity ()
{ {
// Create the infrastructure // Create the infrastructure
...@@ -350,6 +420,7 @@ int main (void) ...@@ -350,6 +420,7 @@ int main (void)
test_connect_before_bind_pub_sub (); test_connect_before_bind_pub_sub ();
test_multiple_connects (); test_multiple_connects ();
test_multiple_threads (); test_multiple_threads ();
test_simultaneous_connect_bind_threads ();
test_identity (); test_identity ();
test_connect_only (); test_connect_only ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment