Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
b2f6741b
Commit
b2f6741b
authored
Dec 01, 2012
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #481 from ianbarber/master
Simplify test_connect_delay
parents
013a99da
75161b5c
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
106 additions
and
109 deletions
+106
-109
test_connect_delay.cpp
tests/test_connect_delay.cpp
+106
-109
No files found.
tests/test_connect_delay.cpp
View file @
b2f6741b
...
@@ -25,114 +25,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
...
@@ -25,114 +25,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <string.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>
#include <string>
#include <string>
#include <pthread.h>
#undef NDEBUG
#undef NDEBUG
#include <assert.h>
#include <assert.h>
static
void
*
server
(
void
*
)
{
void
*
socket
,
*
context
;
char
buffer
[
16
];
int
rc
,
val
;
context
=
zmq_init
(
1
);
assert
(
context
);
socket
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
socket
);
val
=
0
;
rc
=
zmq_setsockopt
(
socket
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
socket
,
"ipc:///tmp/recon"
);
assert
(
rc
==
0
);
memset
(
&
buffer
,
0
,
sizeof
(
buffer
));
rc
=
zmq_recv
(
socket
,
&
buffer
,
sizeof
(
buffer
),
0
);
// Intentionally bail out
rc
=
zmq_close
(
socket
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
context
);
assert
(
rc
==
0
);
usleep
(
200000
);
context
=
zmq_init
(
1
);
assert
(
context
);
socket
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
socket
);
val
=
0
;
rc
=
zmq_setsockopt
(
socket
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
socket
,
"ipc:///tmp/recon"
);
assert
(
rc
==
0
);
usleep
(
200000
);
memset
(
&
buffer
,
0
,
sizeof
(
buffer
));
rc
=
zmq_recv
(
socket
,
&
buffer
,
sizeof
(
buffer
),
ZMQ_DONTWAIT
);
assert
(
rc
!=
-
1
);
// Start closing the socket while the connecting process is underway.
rc
=
zmq_close
(
socket
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
context
);
assert
(
rc
==
0
);
return
NULL
;
}
static
void
*
worker
(
void
*
)
{
void
*
socket
,
*
context
;
int
rc
,
hadone
,
val
;
context
=
zmq_init
(
1
);
assert
(
context
);
socket
=
zmq_socket
(
context
,
ZMQ_PUSH
);
assert
(
socket
);
val
=
0
;
rc
=
zmq_setsockopt
(
socket
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
val
=
1
;
rc
=
zmq_setsockopt
(
socket
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
socket
,
"ipc:///tmp/recon"
);
assert
(
rc
==
0
);
hadone
=
0
;
// Not checking RC as some may be -1
for
(
int
i
=
0
;
i
<
6
;
i
++
)
{
usleep
(
200000
);
rc
=
zmq_send
(
socket
,
"hi"
,
2
,
ZMQ_DONTWAIT
);
if
(
rc
!=
-
1
)
hadone
++
;
}
assert
(
hadone
>=
2
);
assert
(
hadone
<
4
);
rc
=
zmq_close
(
socket
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
context
);
assert
(
rc
==
0
);
return
NULL
;
}
int
main
(
void
)
int
main
(
void
)
{
{
fprintf
(
stderr
,
"test_connect_delay running...
\n
"
);
fprintf
(
stderr
,
"test_connect_delay running...
\n
"
);
...
@@ -141,11 +37,19 @@ int main (void)
...
@@ -141,11 +37,19 @@ int main (void)
char
buffer
[
16
];
char
buffer
[
16
];
int
seen
=
0
;
int
seen
=
0
;
// TEST 1.
// First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see
// the PUSH load balancing to both pipes, and hence half
// of the messages getting queued, as connect() creates a
// pipe immediately.
void
*
context
=
zmq_ctx_new
();
void
*
context
=
zmq_ctx_new
();
assert
(
context
);
assert
(
context
);
void
*
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
void
*
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
to
);
assert
(
to
);
// Bind the one valid receiver
val
=
0
;
val
=
0
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -158,11 +62,15 @@ int main (void)
...
@@ -158,11 +62,15 @@ int main (void)
val
=
0
;
val
=
0
;
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
// This pipe will not connect
rc
=
zmq_connect
(
from
,
"tcp://localhost:5556"
);
rc
=
zmq_connect
(
from
,
"tcp://localhost:5556"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// This pipe will
rc
=
zmq_connect
(
from
,
"tcp://localhost:5555"
);
rc
=
zmq_connect
(
from
,
"tcp://localhost:5555"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe
for
(
int
i
=
0
;
i
<
10
;
++
i
)
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
{
std
::
string
message
(
"message "
);
std
::
string
message
(
"message "
);
...
@@ -171,7 +79,11 @@ int main (void)
...
@@ -171,7 +79,11 @@ int main (void)
assert
(
rc
>=
0
);
assert
(
rc
>=
0
);
}
}
// Sleep to allow the messages to be delivered
zmq_sleep
(
1
);
zmq_sleep
(
1
);
// We now consume from the connected pipe
// - we should see just 5
seen
=
0
;
seen
=
0
;
for
(
int
i
=
0
;
i
<
10
;
++
i
)
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
{
...
@@ -192,9 +104,17 @@ int main (void)
...
@@ -192,9 +104,17 @@ int main (void)
rc
=
zmq_ctx_destroy
(
context
);
rc
=
zmq_ctx_destroy
(
context
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// TEST 2
// This time we will do the same thing, connect two pipes,
// one of which will succeed in connecting to a bound
// receiver, the other of which will fail. However, we will
// also set the delay attach on connect flag, which should
// cause the pipe attachment to be delayed until the connection
// succeeds.
context
=
zmq_ctx_new
();
context
=
zmq_ctx_new
();
fprintf
(
stderr
,
" Rerunning with DELAY_ATTACH_ON_CONNECT
\n
"
);
fprintf
(
stderr
,
" Rerunning with DELAY_ATTACH_ON_CONNECT
\n
"
);
// Bind the valid socket
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
to
);
assert
(
to
);
rc
=
zmq_bind
(
to
,
"tcp://*:5560"
);
rc
=
zmq_bind
(
to
,
"tcp://*:5560"
);
...
@@ -212,16 +132,19 @@ int main (void)
...
@@ -212,16 +132,19 @@ int main (void)
rc
=
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
rc
=
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Set the key flag
val
=
1
;
val
=
1
;
rc
=
zmq_setsockopt
(
from
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
));
rc
=
zmq_setsockopt
(
from
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Connect to the invalid socket
rc
=
zmq_connect
(
from
,
"tcp://localhost:5561"
);
rc
=
zmq_connect
(
from
,
"tcp://localhost:5561"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Connect to the valid socket
rc
=
zmq_connect
(
from
,
"tcp://localhost:5560"
);
rc
=
zmq_connect
(
from
,
"tcp://localhost:5560"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Send 10 messages, all should be routed to the connected pipe
for
(
int
i
=
0
;
i
<
10
;
++
i
)
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
{
std
::
string
message
(
"message "
);
std
::
string
message
(
"message "
);
...
@@ -230,13 +153,16 @@ int main (void)
...
@@ -230,13 +153,16 @@ int main (void)
assert
(
rc
>=
0
);
assert
(
rc
>=
0
);
}
}
// Sleep to allow the messages to be delivered
zmq_sleep
(
1
);
zmq_sleep
(
1
);
// Send 10 messages, all should arrive.
seen
=
0
;
seen
=
0
;
for
(
int
i
=
0
;
i
<
10
;
++
i
)
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
{
memset
(
&
buffer
,
0
,
sizeof
(
buffer
));
memset
(
&
buffer
,
0
,
sizeof
(
buffer
));
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
ZMQ_DONTWAIT
);
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
ZMQ_DONTWAIT
);
// If there is a failed delivery, assert!
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
}
}
...
@@ -249,15 +175,86 @@ int main (void)
...
@@ -249,15 +175,86 @@ int main (void)
rc
=
zmq_ctx_destroy
(
context
);
rc
=
zmq_ctx_destroy
(
context
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// TEST 3
// This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again.
context
=
zmq_ctx_new
();
void
*
context2
=
zmq_ctx_new
();
fprintf
(
stderr
,
" Running DELAY_ATTACH_ON_CONNECT with disconnect
\n
"
);
fprintf
(
stderr
,
" Running DELAY_ATTACH_ON_CONNECT with disconnect
\n
"
);
pthread_t
serv
,
work
;
to
=
zmq_socket
(
context2
,
ZMQ_PULL
);
assert
(
to
);
rc
=
zmq_bind
(
to
,
"tcp://*:5560"
);
assert
(
rc
==
0
);
val
=
0
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
// Create a socket pushing
from
=
zmq_socket
(
context
,
ZMQ_PUSH
);
assert
(
from
);
val
=
0
;
rc
=
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
val
=
1
;
rc
=
zmq_setsockopt
(
from
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
// Connect to the valid socket socket
rc
=
zmq_connect
(
from
,
"tcp://localhost:5560"
);
assert
(
rc
==
0
);
// Allow connections to stabilise
zmq_sleep
(
1
);
// Send a message, should succeed
std
::
string
message
(
"message "
);
rc
=
zmq_send
(
from
,
message
.
data
(),
message
.
size
(),
0
);
assert
(
rc
>=
0
);
rc
=
zmq_close
(
to
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_destroy
(
context2
);
assert
(
rc
==
0
);
// Give time to process disconnect
zmq_sleep
(
1
);
// Send a message, should fail
rc
=
zmq_send
(
from
,
message
.
data
(),
message
.
size
(),
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
);
context2
=
zmq_ctx_new
();
to
=
zmq_socket
(
context2
,
ZMQ_PULL
);
assert
(
to
);
rc
=
zmq_bind
(
to
,
"tcp://*:5560"
);
assert
(
rc
==
0
);
rc
=
pthread_create
(
&
serv
,
NULL
,
server
,
NULL
);
val
=
0
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
// Allow connections to stabilise
zmq_sleep
(
1
);
// After the reconnect, should succeed
rc
=
zmq_send
(
from
,
message
.
data
(),
message
.
size
(),
0
);
assert
(
rc
>=
0
);
rc
=
zmq_close
(
to
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
from
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
pthread_create
(
&
work
,
NULL
,
worker
,
NULL
);
rc
=
zmq_ctx_destroy
(
context
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
pthread_exit
(
NULL
);
rc
=
zmq_ctx_destroy
(
context2
);
assert
(
rc
==
0
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment