Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
8797af77
Commit
8797af77
authored
Nov 06, 2013
by
Richard Newton
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #737 from hintjens/master
Removed over-long pauses in tests
parents
c2c6ec19
5b60540e
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
126 additions
and
151 deletions
+126
-151
test_conflate.cpp
tests/test_conflate.cpp
+2
-5
test_connect_delay_tipc.cpp
tests/test_connect_delay_tipc.cpp
+1
-3
test_ctx_destroy.cpp
tests/test_ctx_destroy.cpp
+1
-1
test_immediate.cpp
tests/test_immediate.cpp
+1
-2
test_inproc_connect.cpp
tests/test_inproc_connect.cpp
+1
-1
test_iov.cpp
tests/test_iov.cpp
+1
-1
test_monitor.cpp
tests/test_monitor.cpp
+3
-3
test_proxy.cpp
tests/test_proxy.cpp
+97
-113
test_req_relaxed.cpp
tests/test_req_relaxed.cpp
+1
-1
test_spec_req.cpp
tests/test_spec_req.cpp
+1
-1
test_sub_forward.cpp
tests/test_sub_forward.cpp
+1
-1
test_sub_forward_tipc.cpp
tests/test_sub_forward_tipc.cpp
+1
-1
test_term_endpoint.cpp
tests/test_term_endpoint.cpp
+2
-2
test_term_endpoint_tipc.cpp
tests/test_term_endpoint_tipc.cpp
+2
-3
testutil.hpp
tests/testutil.hpp
+11
-13
No files found.
tests/test_conflate.cpp
View file @
8797af77
...
@@ -45,7 +45,6 @@ int main (int argc, char *argv [])
...
@@ -45,7 +45,6 @@ int main (int argc, char *argv [])
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
message_count
=
20
;
int
message_count
=
20
;
for
(
int
j
=
0
;
j
<
message_count
;
++
j
)
{
for
(
int
j
=
0
;
j
<
message_count
;
++
j
)
{
rc
=
zmq_send
(
s_out
,
(
void
*
)
&
j
,
sizeof
(
int
),
0
);
rc
=
zmq_send
(
s_out
,
(
void
*
)
&
j
,
sizeof
(
int
),
0
);
if
(
rc
<
0
)
{
if
(
rc
<
0
)
{
...
@@ -53,15 +52,13 @@ int main (int argc, char *argv [])
...
@@ -53,15 +52,13 @@ int main (int argc, char *argv [])
return
-
1
;
return
-
1
;
}
}
}
}
msleep
(
SETTLE_TIME
);
zmq_sleep
(
1
);
int
payload_recved
=
0
;
int
payload_recved
=
0
;
rc
=
zmq_recv
(
s_in
,
(
void
*
)
&
payload_recved
,
sizeof
(
int
),
0
);
rc
=
zmq_recv
(
s_in
,
(
void
*
)
&
payload_recved
,
sizeof
(
int
),
0
);
assert
(
rc
>
0
);
assert
(
rc
>
0
);
assert
(
payload_recved
==
message_count
-
1
);
assert
(
payload_recved
==
message_count
-
1
);
rc
=
zmq_close
(
s_in
);
rc
=
zmq_close
(
s_in
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
tests/test_connect_delay_tipc.cpp
View file @
8797af77
...
@@ -200,9 +200,7 @@ int main (void)
...
@@ -200,9 +200,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Give time to process disconnect
// Give time to process disconnect
// There's no way to do this except with a sleep
msleep
(
SETTLE_TIME
);
struct
timespec
t
=
{
0
,
250
*
1000000
};
nanosleep
(
&
t
,
NULL
);
// Send a message, should fail
// Send a message, should fail
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
...
...
tests/test_ctx_destroy.cpp
View file @
8797af77
...
@@ -61,7 +61,7 @@ void test_ctx_shutdown()
...
@@ -61,7 +61,7 @@ void test_ctx_shutdown()
void
*
receiver_thread
=
zmq_threadstart
(
&
receiver
,
socket
);
void
*
receiver_thread
=
zmq_threadstart
(
&
receiver
,
socket
);
// Wait for thread to start up and block
// Wait for thread to start up and block
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Shutdown context, if we used destroy here we would deadlock.
// Shutdown context, if we used destroy here we would deadlock.
rc
=
zmq_ctx_shutdown
(
ctx
);
rc
=
zmq_ctx_shutdown
(
ctx
);
...
...
tests/test_immediate.cpp
View file @
8797af77
...
@@ -193,8 +193,7 @@ int main (void)
...
@@ -193,8 +193,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Give time to process disconnect
// Give time to process disconnect
// There's no way to do this except with a sleep
msleep
(
SETTLE_TIME
);
zmq_sleep
(
1
);
// Send a message, should fail
// Send a message, should fail
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
...
...
tests/test_inproc_connect.cpp
View file @
8797af77
...
@@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub()
...
@@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub()
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Wait for pub-sub connection to happen
// Wait for pub-sub connection to happen
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Queue up some data, this not will be dropped
// Queue up some data, this not will be dropped
rc
=
zmq_send_const
(
connectSocket
,
"after"
,
6
,
0
);
rc
=
zmq_send_const
(
connectSocket
,
"after"
,
6
,
0
);
...
...
tests/test_iov.cpp
View file @
8797af77
...
@@ -80,7 +80,7 @@ int main (void)
...
@@ -80,7 +80,7 @@ int main (void)
rc
=
zmq_bind
(
sb
,
"inproc://a"
);
rc
=
zmq_bind
(
sb
,
"inproc://a"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
void
*
sc
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
void
*
sc
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
rc
=
zmq_connect
(
sc
,
"inproc://a"
);
rc
=
zmq_connect
(
sc
,
"inproc://a"
);
...
...
tests/test_monitor.cpp
View file @
8797af77
...
@@ -211,7 +211,7 @@ int main (void)
...
@@ -211,7 +211,7 @@ int main (void)
rc
=
zmq_socket_monitor
(
req
,
"inproc://monitor.req"
,
ZMQ_EVENT_ALL
);
rc
=
zmq_socket_monitor
(
req
,
"inproc://monitor.req"
,
ZMQ_EVENT_ALL
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
threads
[
1
]
=
zmq_threadstart
(
&
req_socket_monitor
,
ctx
);
threads
[
1
]
=
zmq_threadstart
(
&
req_socket_monitor
,
ctx
);
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Bind REQ and REP
// Bind REQ and REP
rc
=
zmq_bind
(
rep
,
addr
.
c_str
());
rc
=
zmq_bind
(
rep
,
addr
.
c_str
());
...
@@ -238,8 +238,8 @@ int main (void)
...
@@ -238,8 +238,8 @@ int main (void)
rc
=
zmq_close
(
rep
);
rc
=
zmq_close
(
rep
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Allow
some
time for detecting error states
// Allow
enough
time for detecting error states
zmq_sleep
(
1
);
msleep
(
250
);
// Close the REQ socket
// Close the REQ socket
rc
=
zmq_close
(
req
);
rc
=
zmq_close
(
req
);
...
...
tests/test_proxy.cpp
View file @
8797af77
...
@@ -37,19 +37,17 @@
...
@@ -37,19 +37,17 @@
#define CONTENT_SIZE_MAX 32
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE 10
#define ID_SIZE_MAX 32
#define ID_SIZE_MAX 32
#define QT_WORKERS
5
#define QT_WORKERS
5
#define QT_CLIENTS
3
#define QT_CLIENTS
3
#define is_verbose 0
#define is_verbose 0
static
void
static
void
client_task
(
void
*
ctx
)
client_task
(
void
*
ctx
)
{
{
// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
// assert (ctx);
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
client
);
assert
(
client
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
assert
(
control
);
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
...
@@ -58,54 +56,52 @@ client_task (void *ctx)
...
@@ -58,54 +56,52 @@ client_task (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
char
content
[
CONTENT_SIZE_MAX
];
// Set random identity to make tracing easier
// Set random identity to make tracing easier
char
identity
[
ID_SIZE
];
char
identity
[
ID_SIZE
];
sprintf
(
identity
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
sprintf
(
identity
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
rc
=
zmq_setsockopt
(
client
,
ZMQ_IDENTITY
,
identity
,
ID_SIZE
);
// includes '\0' as an helper for printf
rc
=
zmq_setsockopt
(
client
,
ZMQ_IDENTITY
,
identity
,
ID_SIZE
);
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:9999"
);
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:9999"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{
{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}
};
zmq_pollitem_t
items
[]
=
{
{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}
};
int
request_nbr
=
0
;
int
request_nbr
=
0
;
bool
run
=
true
;
bool
run
=
true
;
while
(
run
)
{
while
(
run
)
{
// Tick once per 200 ms, pulling in arriving messages
// Tick once per 200 ms, pulling in arriving messages
int
centitick
;
int
centitick
;
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
zmq_poll
(
items
,
2
,
10
);
zmq_poll
(
items
,
2
,
10
);
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
assert
(
rc
==
CONTENT_SIZE
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s content = %s
\n
"
,
identity
,
content
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s content = %s
\n
"
,
identity
,
content
);
// Check that message is still the same
// Check that message is still the same
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
assert
(
!
rcvmore
);
assert
(
!
rcvmore
);
}
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s command = %s
\n
"
,
identity
,
content
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s command = %s
\n
"
,
identity
,
content
);
if
(
memcmp
(
content
,
"TERMINATE"
,
10
)
==
0
)
{
if
(
memcmp
(
content
,
"TERMINATE"
,
10
)
==
0
)
{
run
=
false
;
run
=
false
;
break
;
break
;
}
}
}
}
}
}
sprintf
(
content
,
"request #%03d"
,
++
request_nbr
);
// CONTENT_SIZE
sprintf
(
content
,
"request #%03d"
,
++
request_nbr
);
// CONTENT_SIZE
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
rc
=
zmq_close
(
client
);
rc
=
zmq_close
(
client
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// rc = zmq_ctx_term (ctx);
// assert (rc == 0);
}
}
// This is our server task.
// This is our server task.
...
@@ -119,22 +115,19 @@ static void server_worker (void *ctx);
...
@@ -119,22 +115,19 @@ static void server_worker (void *ctx);
void
void
server_task
(
void
*
ctx
)
server_task
(
void
*
ctx
)
{
{
// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
// assert (ctx);
// Frontend socket talks to clients over TCP
// Frontend socket talks to clients over TCP
void
*
frontend
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
void
*
frontend
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
frontend
);
assert
(
frontend
);
int
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:9999"
);
int
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:9999"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
// Backend socket talks to workers over inproc
void
*
backend
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
backend
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
backend
);
assert
(
backend
);
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
...
@@ -142,17 +135,17 @@ server_task (void *ctx)
...
@@ -142,17 +135,17 @@ server_task (void *ctx)
rc
=
zmq_connect
(
control
,
"inproc://control"
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
int
thread_nbr
;
void
*
threads
[
5
];
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx
);
// Connect backend to frontend via a proxy
// Connect backend to frontend via a proxy
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
zmq_threadclose
(
threads
[
thread_nbr
]);
rc
=
zmq_close
(
frontend
);
rc
=
zmq_close
(
frontend
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -160,8 +153,6 @@ server_task (void *ctx)
...
@@ -160,8 +153,6 @@ server_task (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// rc = zmq_ctx_term (ctx);
// assert (rc == 0);
}
}
// Each worker task works on one request at a time and sends a random number
// Each worker task works on one request at a time and sends a random number
...
@@ -176,7 +167,7 @@ server_worker (void *ctx)
...
@@ -176,7 +167,7 @@ server_worker (void *ctx)
int
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
int
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
...
@@ -184,44 +175,40 @@ server_worker (void *ctx)
...
@@ -184,44 +175,40 @@ server_worker (void *ctx)
rc
=
zmq_connect
(
control
,
"inproc://control"
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
identity
[
ID_SIZE_MAX
];
// the size received is the size sent
char
identity
[
ID_SIZE_MAX
];
// the size received is the size sent
// zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; // POLLING
bool
run
=
true
;
bool
run
=
true
;
while
(
run
)
{
while
(
run
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
// zmq_poll (items, 2, 10); // POLLING
if
(
rc
>
0
)
{
// if (items [1].revents & ZMQ_POLLIN) { // POLLING
if
(
is_verbose
)
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
printf
(
"server_worker receives command = %s
\n
"
,
content
);
if
(
rc
>
0
)
{
if
(
memcmp
(
content
,
"TERMINATE"
,
10
)
==
0
)
if
(
is_verbose
)
printf
(
"server_worker receives command = %s
\n
"
,
content
);
run
=
false
;
if
(
memcmp
(
content
,
"TERMINATE"
,
10
)
==
0
)
}
run
=
false
;
// The DEALER socket gives us the reply envelope and message
}
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
// } // POLLING
rc
=
zmq_recv
(
worker
,
identity
,
ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ID_SIZE
)
{
// if (items [0].revents & ZMQ_POLLIN) { // POLLING
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
// The DEALER socket gives us the reply envelope and message
assert
(
rc
==
CONTENT_SIZE
);
rc
=
zmq_recv
(
worker
,
identity
,
ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
if
(
is_verbose
)
if
(
rc
==
ID_SIZE
)
{
printf
(
"server receive - identity = %s content = %s
\n
"
,
identity
,
content
);
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
// Send 0..4 replies back
if
(
is_verbose
)
printf
(
"server receive - identity = %s content = %s
\n
"
,
identity
,
content
);
int
reply
,
replies
=
rand
()
%
5
;
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Send 0..4 replies back
// Sleep for some fraction of a second
int
reply
,
replies
=
rand
()
%
5
;
msleep
(
rand
()
%
10
+
1
);
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Send message from server to client
// Sleep for some fraction of a second
rc
=
zmq_send
(
worker
,
identity
,
ID_SIZE
,
ZMQ_SNDMORE
);
msleep
(
rand
()
%
10
+
1
);
assert
(
rc
==
ID_SIZE
);
// Send message from server to client
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
rc
=
zmq_send
(
worker
,
identity
,
ID_SIZE
,
ZMQ_SNDMORE
);
assert
(
rc
==
CONTENT_SIZE
);
assert
(
rc
==
ID_SIZE
);
}
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
}
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
// } // POLLING
}
rc
=
zmq_close
(
worker
);
rc
=
zmq_close
(
worker
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
rc
=
zmq_close
(
control
);
...
@@ -237,31 +224,28 @@ int main (void)
...
@@ -237,31 +224,28 @@ int main (void)
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
control
);
assert
(
control
);
int
rc
=
zmq_bind
(
control
,
"inproc://control"
);
int
rc
=
zmq_bind
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
void
*
threads
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
ctx
);
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
ctx
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
}
msleep
(
500
);
// Run for 500 ms then quit
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
msleep
(
500
);
// Run for 500 ms then quit
rc
=
zmq_send
(
control
,
"TERMINATE"
,
10
,
0
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
10
,
0
);
assert
(
rc
==
10
);
assert
(
rc
==
10
);
// clean everything
rc
=
zmq_close
(
control
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
//msleep (1000); // not sure it is usefull
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
zmq_threadclose
(
threads
[
i
]);
rc
=
zmq_ctx_term
(
ctx
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
return
0
;
return
0
;
}
}
tests/test_req_relaxed.cpp
View file @
8797af77
...
@@ -54,7 +54,7 @@ int main (void)
...
@@ -54,7 +54,7 @@ int main (void)
// We have to give the connects time to finish otherwise the requests
// We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the
// will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets.
// REQ sockets to the REP sockets.
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Case 1: Second send() before a reply arrives in a pipe.
// Case 1: Second send() before a reply arrives in a pipe.
...
...
tests/test_spec_req.cpp
View file @
8797af77
...
@@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx)
...
@@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx)
// We have to give the connects time to finish otherwise the requests
// We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the
// will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets.
// REQ sockets to the REP sockets.
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Send our peer-replies, and expect every REP it used once in order
// Send our peer-replies, and expect every REP it used once in order
for
(
size_t
peer
=
0
;
peer
<
services
;
peer
++
)
{
for
(
size_t
peer
=
0
;
peer
<
services
;
peer
++
)
{
...
...
tests/test_sub_forward.cpp
View file @
8797af77
...
@@ -59,7 +59,7 @@ int main (void)
...
@@ -59,7 +59,7 @@ int main (void)
assert
(
rc
>=
0
);
assert
(
rc
>=
0
);
// Wait a bit till the subscription gets to the publisher
// Wait a bit till the subscription gets to the publisher
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Send an empty message
// Send an empty message
rc
=
zmq_send
(
pub
,
NULL
,
0
,
0
);
rc
=
zmq_send
(
pub
,
NULL
,
0
,
0
);
...
...
tests/test_sub_forward_tipc.cpp
View file @
8797af77
...
@@ -67,7 +67,7 @@ int main (void)
...
@@ -67,7 +67,7 @@ int main (void)
assert
(
rc
>=
0
);
assert
(
rc
>=
0
);
// Wait a bit till the subscription gets to the publisher.
// Wait a bit till the subscription gets to the publisher.
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Send an empty message.
// Send an empty message.
rc
=
zmq_send
(
pub
,
NULL
,
0
,
0
);
rc
=
zmq_send
(
pub
,
NULL
,
0
,
0
);
...
...
tests/test_term_endpoint.cpp
View file @
8797af77
...
@@ -49,7 +49,7 @@ int main (void)
...
@@ -49,7 +49,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Allow unbind to settle
// Allow unbind to settle
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Check that sending would block (there's no outbound connection)
// Check that sending would block (there's no outbound connection)
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
...
@@ -86,7 +86,7 @@ int main (void)
...
@@ -86,7 +86,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Allow disconnect to settle
// Allow disconnect to settle
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Check that sending would block (there's no inbound connections).
// Check that sending would block (there's no inbound connections).
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
...
...
tests/test_term_endpoint_tipc.cpp
View file @
8797af77
...
@@ -59,7 +59,7 @@ int main (void)
...
@@ -59,7 +59,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Let events some time
// Let events some time
zmq_sleep
(
1
);
msleep
(
SETTLE_TIME
);
// Check that sending would block (there's no outbound connection).
// Check that sending would block (there's no outbound connection).
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
...
@@ -100,8 +100,7 @@ int main (void)
...
@@ -100,8 +100,7 @@ int main (void)
rc
=
zmq_disconnect
(
push
,
name
);
rc
=
zmq_disconnect
(
push
,
name
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Let events some time
msleep
(
SETTLE_TIME
);
zmq_sleep
(
1
);
// Check that sending would block (there's no inbound connections).
// Check that sending would block (there's no inbound connections).
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
push
,
"ABC"
,
3
,
ZMQ_DONTWAIT
);
...
...
tests/testutil.hpp
View file @
8797af77
...
@@ -24,6 +24,11 @@
...
@@ -24,6 +24,11 @@
#include "../include/zmq_utils.h"
#include "../include/zmq_utils.h"
#include "platform.hpp"
#include "platform.hpp"
// This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not
// settled. Tested to work reliably at 1 msec on a fast PC.
#define SETTLE_TIME 10 // In msec
#undef NDEBUG
#undef NDEBUG
#include <time.h>
#include <time.h>
#include <assert.h>
#include <assert.h>
...
@@ -259,22 +264,15 @@ void setup_test_environment()
...
@@ -259,22 +264,15 @@ void setup_test_environment()
#endif
#endif
}
}
// provide portable millisecond sleep
// Provide portable millisecond sleep
#include <time.h>
// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for
void
msleep
(
int
milliseconds
)
{
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
#include <windows.h>
Sleep
(
milliseconds
);
#else
#else
#include <unistd.h>
usleep
(
static_cast
<
useconds_t
>
(
milliseconds
)
*
1000
);
#endif
#endif
void
msleep
(
int
milliseconds
)
{
// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for
#ifdef ZMQ_HAVE_WINDOWS
Sleep
(
milliseconds
);
#else
usleep
(
static_cast
<
useconds_t
>
(
milliseconds
)
*
1000
);
#endif
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment