Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
1734a64d
Commit
1734a64d
authored
Aug 24, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "Problem: test_proxy not yet using unity"
This reverts commit
fd27324e
.
parent
e0fe7f10
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
192 additions
and
154 deletions
+192
-154
Makefile.am
Makefile.am
+1
-2
test_proxy.cpp
tests/test_proxy.cpp
+191
-152
No files found.
Makefile.am
View file @
1734a64d
...
...
@@ -626,8 +626,7 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
tests_test_issue_566_LDADD
=
src/libzmq.la
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_proxy_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_proxy_LDADD
=
src/libzmq.la
tests_test_proxy_single_socket_SOURCES
=
tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD
=
src/libzmq.la
...
...
tests/test_proxy.cpp
View file @
1734a64d
...
...
@@ -28,32 +28,21 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
void
setUp
()
{
setup_test_context
();
}
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
void
tearDown
()
{
teardown_test_context
();
}
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 12
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
...
...
@@ -63,6 +52,7 @@ void tearDown ()
struct
thread_data
{
void
*
ctx
;
int
id
;
};
...
...
@@ -86,38 +76,46 @@ void *g_workers_pkts_out = NULL;
static
void
client_task
(
void
*
db_
)
{
struct
thread_data
*
databag
=
static_cast
<
struct
thread_data
*>
(
db_
)
;
struct
thread_data
*
databag
=
(
struct
thread_data
*
)
db_
;
// Endpoint socket gets random port to avoid test failing when port in use
void
*
endpoint
=
test_context_socket
(
ZMQ_PAIR
);
void
*
endpoint
=
zmq_socket
(
databag
->
ctx
,
ZMQ_PAIR
);
assert
(
endpoint
);
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
))
);
int
rc
=
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
char
endpoint_source
[
256
];
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
endpoint
,
endpoint_source
));
rc
=
zmq_connect
(
endpoint
,
endpoint_source
);
assert
(
rc
==
0
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
TEST_ASSERT_NOT_NULL
(
my_endpoint
);
assert
(
my_endpoint
);
void
*
client
=
test_context_socket
(
ZMQ_DEALER
);
void
*
client
=
zmq_socket
(
databag
->
ctx
,
ZMQ_DEALER
);
assert
(
client
);
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
void
*
control
=
zmq_socket
(
databag
->
ctx
,
ZMQ_SUB
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// Set random routing id to make tracing easier
char
routing_id
[
ROUTING_ID_SIZE
];
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
client
,
ZMQ_ROUTING_ID
,
routing_id
,
ROUTING_ID_SIZE
));
// includes '\0' as an helper for printf
rc
=
zmq_setsockopt
(
client
,
ZMQ_ROUTING_ID
,
routing_id
,
ROUTING_ID_SIZE
);
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
client
,
my_endpoint
));
rc
=
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
client
,
my_endpoint
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
...
...
@@ -125,27 +123,27 @@ static void client_task (void *db_)
bool
run
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
for
(
int
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
// Tick once per 200 ms, pulling in arriving messages
int
centitick
;
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
zmq_poll
(
items
,
2
,
10
);
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
)));
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
if
(
is_verbose
)
printf
(
"client receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
// Check that message is still the same
TEST_ASSERT_EQUAL_STRING_LEN
(
"request #"
,
content
,
9
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
)
);
TEST_ASSERT_FALSE
(
rcvmore
);
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvmore
);
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
int
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
));
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
...
...
@@ -172,13 +170,17 @@ static void client_task (void *db_)
routing_id
,
request_nbr
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
send_string_expect_success
(
client
,
content
,
0
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
test_context_socket_close
(
client
);
test_context_socket_close
(
control
);
test_context_socket_close
(
endpoint
);
rc
=
zmq_close
(
client
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
endpoint
);
assert
(
rc
==
0
);
free
(
my_endpoint
);
}
...
...
@@ -188,64 +190,80 @@ static void client_task (void *db_)
// one request at a time but one client can talk to multiple workers at
// once.
static
void
server_worker
(
void
*
/*dummy_*/
);
static
void
server_worker
(
void
*
ctx_
);
void
server_task
(
void
*
/*dummy_*/
)
void
server_task
(
void
*
ctx_
)
{
// Frontend socket talks to clients over TCP
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
frontend
=
test_context_socket
(
ZMQ_ROUTER
);
void
*
frontend
=
zmq_socket
(
ctx_
,
ZMQ_ROUTER
);
assert
(
frontend
);
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
bind_loopback_ipv4
(
frontend
,
my_endpoint
,
sizeof
my_endpoint
);
int
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:*"
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
frontend
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
void
*
backend
=
test_context_socket
(
ZMQ_DEALER
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"inproc://backend"
));
void
*
backend
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
assert
(
backend
);
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_REP
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control_proxy"
));
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_REP
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control_proxy"
);
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
NULL
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx_
);
// Endpoint socket sends random port to avoid test failing when port in use
void
*
endpoint_receivers
[
QT_CLIENTS
];
char
endpoint_source
[
256
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
endpoint_receivers
[
i
]
=
test_context_socket
(
ZMQ_PAIR
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
endpoint_receivers
[
i
]
=
zmq_socket
(
ctx_
,
ZMQ_PAIR
);
assert
(
endpoint_receivers
[
i
]);
rc
=
zmq_setsockopt
(
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
)
);
rc
=
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
);
assert
(
rc
==
0
);
}
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
TEST_ASSERT_SUCCESS_ERRNO
(
s_send
(
endpoint_receivers
[
i
],
my_endpoint
));
rc
=
s_send
(
endpoint_receivers
[
i
],
my_endpoint
);
assert
(
rc
>
0
);
}
// Connect backend to frontend via a proxy
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
)
);
rc
=
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
assert
(
rc
==
0
);
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
test_context_socket_close
(
frontend
);
test_context_socket_close
(
backend
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
frontend
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
backend
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
test_context_socket_close
(
endpoint_receivers
[
i
]);
rc
=
zmq_close
(
endpoint_receivers
[
i
]);
assert
(
rc
==
0
);
}
}
...
...
@@ -253,20 +271,25 @@ void server_task (void * /*dummy_*/)
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
static
void
server_worker
(
void
*
/*dummy_*/
)
static
void
server_worker
(
void
*
ctx_
)
{
void
*
worker
=
test_context_socket
(
ZMQ_DEALER
);
void
*
worker
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
assert
(
worker
);
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
worker
,
"inproc://backend"
));
int
rc
=
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
routing_id
[
ROUTING_ID_SIZE_MAX
];
// the size received is the size sent
...
...
@@ -274,8 +297,8 @@ static void server_worker (void * /*dummy_*/)
bool
run
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
int
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
if
(
is_verbose
)
...
...
@@ -289,17 +312,16 @@ static void server_worker (void * /*dummy_*/)
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ROUTING_ID_SIZE
)
{
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
)));
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
if
(
is_verbose
)
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
// Send 0..4 replies back
if
(
keep_sending
)
{
const
int
replies
=
rand
()
%
5
;
for
(
int
reply
=
0
;
reply
<
replies
;
reply
++
)
{
int
reply
,
replies
=
rand
()
%
5
;
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Sleep for some fraction of a second
msleep
(
rand
()
%
10
+
1
);
...
...
@@ -309,15 +331,19 @@ static void server_worker (void * /*dummy_*/)
routing_id
);
zmq_atomic_counter_inc
(
g_workers_pkts_out
);
send_string_expect_success
(
worker
,
routing_id
,
ZMQ_SNDMORE
);
send_string_expect_success
(
worker
,
content
,
0
);
rc
=
zmq_send
(
worker
,
routing_id
,
ROUTING_ID_SIZE
,
ZMQ_SNDMORE
);
assert
(
rc
==
ROUTING_ID_SIZE
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
}
test_context_socket_close
(
worker
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
worker
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
...
...
@@ -325,18 +351,19 @@ uint64_t recv_stat (void *sock_, bool last_)
uint64_t
res
;
zmq_msg_t
stats_msg
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
stats_msg
)
);
TEST_ASSERT_EQUAL_INT
(
sizeof
(
uint64_t
),
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
)
));
int
rc
=
zmq_msg_init
(
&
stats_msg
);
assert
(
rc
==
0
);
rc
=
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
);
assert
(
rc
==
sizeof
(
uint64_t
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
stats_msg
));
rc
=
zmq_msg_close
(
&
stats_msg
);
assert
(
rc
==
0
);
int
more
;
size_t
moresz
=
sizeof
more
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
)
);
TEST_ASSERT_TRUE
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
rc
=
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
assert
(
rc
==
0
);
assert
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
return
res
;
}
...
...
@@ -346,8 +373,10 @@ uint64_t recv_stat (void *sock_, bool last_)
void
check_proxy_stats
(
void
*
control_proxy_
)
{
zmq_proxy_stats_t
total_stats
;
int
rc
;
send_string_expect_success
(
control_proxy_
,
"STATISTICS"
,
0
);
rc
=
zmq_send
(
control_proxy_
,
"STATISTICS"
,
10
,
0
);
assert
(
rc
==
10
);
// first frame of the reply contains FRONTEND stats:
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
...
...
@@ -382,54 +411,67 @@ void check_proxy_stats (void *control_proxy_)
printf
(
"workers sent out %d replies
\n
"
,
zmq_atomic_counter_value
(
g_workers_pkts_out
));
}
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
total_stats
.
frontend
.
msg_in
);
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
total_stats
.
frontend
.
msg_out
);
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
total_stats
.
backend
.
msg_in
);
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
total_stats
.
backend
.
msg_out
);
assert
(
total_stats
.
frontend
.
msg_in
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
assert
(
total_stats
.
frontend
.
msg_out
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
assert
(
total_stats
.
backend
.
msg_in
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
assert
(
total_stats
.
backend
.
msg_out
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
}
void
test_proxy
()
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_PUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
control
);
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
control
,
"inproc://control"
));
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
void
*
control_proxy
=
test_context_socket
(
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
));
void
*
control_proxy
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
control_proxy
);
rc
=
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
);
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
databags
[
i
].
ctx
=
ctx
;
databags
[
i
].
id
=
i
;
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
}
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
NULL
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
msleep
(
500
);
// Run for 500 ms then quit
if
(
is_verbose
)
printf
(
"stopping all clients and server workers
\n
"
);
send_string_expect_success
(
control
,
"STOP"
,
0
);
rc
=
zmq_send
(
control
,
"STOP"
,
4
,
0
);
assert
(
rc
==
4
);
msleep
(
500
);
// Wait for all clients and workers to STOP
#ifdef ZMQ_BUILD_DRAFT_API
if
(
is_verbose
)
printf
(
"retrieving stats from the proxy
\n
"
);
...
...
@@ -438,27 +480,24 @@ void test_proxy ()
if
(
is_verbose
)
printf
(
"shutting down all clients and server workers
\n
"
);
send_string_expect_success
(
control
,
"TERMINATE"
,
0
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
assert
(
rc
==
9
);
if
(
is_verbose
)
printf
(
"shutting down the proxy
\n
"
);
send_string_expect_success
(
control_proxy
,
"TERMINATE"
,
0
);
rc
=
zmq_send
(
control_proxy
,
"TERMINATE"
,
9
,
0
);
assert
(
rc
==
9
);
test_context_socket_close
(
control
);
test_context_socket_close
(
control_proxy
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control_proxy
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
setup_test_environment
();
UNITY_BEGIN
(
);
RUN_TEST
(
test_proxy
);
return
UNITY_END
()
;
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment