Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
6ed03e93
Commit
6ed03e93
authored
Mar 22, 2019
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: tests without test framework
Solution: migrate to Unity
parent
75cd23d6
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
150 additions
and
177 deletions
+150
-177
Makefile.am
Makefile.am
+2
-1
test_proxy.cpp
tests/test_proxy.cpp
+148
-176
No files found.
Makefile.am
View file @
6ed03e93
...
@@ -664,7 +664,8 @@ tests_test_issue_566_LDADD = src/libzmq.la ${UNITY_LIBS}
...
@@ -664,7 +664,8 @@ tests_test_issue_566_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_issue_566_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_issue_566_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_LDADD
=
src/libzmq.la
tests_test_proxy_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_proxy_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_proxy_hwm_SOURCES
=
tests/test_proxy_hwm.cpp
tests_test_proxy_hwm_SOURCES
=
tests/test_proxy_hwm.cpp
tests_test_proxy_hwm_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_proxy_hwm_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
...
...
tests/test_proxy.cpp
View file @
6ed03e93
...
@@ -28,19 +28,17 @@
...
@@ -28,19 +28,17 @@
*/
*/
#include "testutil.hpp"
#include "testutil.hpp"
#include "testutil_unity.hpp"
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
void
setUp
()
//
{
// While this example runs in a single process, that is to make
setup_test_context
();
// it easier to start and stop the example. Each task may have its own
}
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
void
tearDown
()
// It connects to the server, and then sends a request once per second
{
// It collects responses as they arrive, and it prints them out. We will
teardown_test_context
();
// run several client tasks in parallel, each with a different random ID.
}
#define CONTENT_SIZE 13
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define CONTENT_SIZE_MAX 32
...
@@ -52,7 +50,6 @@
...
@@ -52,7 +50,6 @@
struct
thread_data
struct
thread_data
{
{
void
*
ctx
;
int
id
;
int
id
;
};
};
...
@@ -73,49 +70,56 @@ typedef struct
...
@@ -73,49 +70,56 @@ typedef struct
void
*
g_clients_pkts_out
=
NULL
;
void
*
g_clients_pkts_out
=
NULL
;
void
*
g_workers_pkts_out
=
NULL
;
void
*
g_workers_pkts_out
=
NULL
;
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
static
void
client_task
(
void
*
db_
)
static
void
client_task
(
void
*
db_
)
{
{
struct
thread_data
*
databag
=
(
struct
thread_data
*
)
db_
;
struct
thread_data
*
databag
=
(
struct
thread_data
*
)
db_
;
// Endpoint socket gets random port to avoid test failing when port in use
// Endpoint socket gets random port to avoid test failing when port in use
void
*
endpoint
=
zmq_socket
(
databag
->
ctx
,
ZMQ_PAIR
);
void
*
endpoint
=
zmq_socket
(
get_test_context
()
,
ZMQ_PAIR
);
assert
(
endpoint
);
TEST_ASSERT_NOT_NULL
(
endpoint
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
))
);
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
rc
=
zmq_connect
(
endpoint
,
endpoint_source
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
endpoint
,
endpoint_source
));
assert
(
rc
==
0
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
assert
(
my_endpoint
);
TEST_ASSERT_NOT_NULL
(
my_endpoint
);
void
*
client
=
zmq_socket
(
databag
->
ctx
,
ZMQ_DEALER
);
void
*
client
=
zmq_socket
(
get_test_context
()
,
ZMQ_DEALER
);
assert
(
client
);
TEST_ASSERT_NOT_NULL
(
client
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
databag
->
ctx
,
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
get_test_context
(),
ZMQ_SUB
);
assert
(
control
);
TEST_ASSERT_NOT_NULL
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
]
=
{};
char
content
[
CONTENT_SIZE_MAX
]
=
{};
// Set random routing id to make tracing easier
// Set random routing id to make tracing easier
char
routing_id
[
ROUTING_ID_SIZE
]
=
{};
char
routing_id
[
ROUTING_ID_SIZE
]
=
{};
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
zmq_setsockopt
(
client
,
ZMQ_ROUTING_ID
,
routing_id
,
client
,
ZMQ_ROUTING_ID
,
routing_id
,
ROUTING_ID_SIZE
);
// includes '\0' as an helper for printf
ROUTING_ID_SIZE
));
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
linger
=
0
;
linger
=
0
;
rc
=
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_connect
(
client
,
my_endpoint
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
client
,
my_endpoint
));
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
...
@@ -130,20 +134,21 @@ static void client_task (void *db_)
...
@@ -130,20 +134,21 @@ static void client_task (void *db_)
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
int
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
CONTENT_SIZE
);
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
));
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
rc
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
printf
(
"client receive - routing_id = %s content = %s
\n
"
,
"client receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
// Check that message is still the same
// Check that message is still the same
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
TEST_ASSERT_EQUAL_STRING_LEN
(
"request #"
,
content
,
9
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
)
);
assert
(
!
rcvmore
);
TEST_ASSERT_FALSE
(
rcvmore
);
}
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
int
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
...
@@ -170,17 +175,14 @@ static void client_task (void *db_)
...
@@ -170,17 +175,14 @@ static void client_task (void *db_)
routing_id
,
request_nbr
);
routing_id
,
request_nbr
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
assert
(
rc
==
CONTENT_SIZE
);
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
)
);
}
}
}
}
rc
=
zmq_close
(
client
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
client
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
control
));
rc
=
zmq_close
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
endpoint
));
assert
(
rc
==
0
);
rc
=
zmq_close
(
endpoint
);
assert
(
rc
==
0
);
free
(
my_endpoint
);
free
(
my_endpoint
);
}
}
...
@@ -190,80 +192,68 @@ static void client_task (void *db_)
...
@@ -190,80 +192,68 @@ static void client_task (void *db_)
// one request at a time but one client can talk to multiple workers at
// one request at a time but one client can talk to multiple workers at
// once.
// once.
static
void
server_worker
(
void
*
ctx_
);
static
void
server_worker
(
void
*
/*unused_*/
);
void
server_task
(
void
*
ctx_
)
void
server_task
(
void
*
/*unused_*/
)
{
{
// Frontend socket talks to clients over TCP
// Frontend socket talks to clients over TCP
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
frontend
=
zmq_socket
(
ctx_
,
ZMQ_ROUTER
);
void
*
frontend
=
zmq_socket
(
get_test_context
()
,
ZMQ_ROUTER
);
assert
(
frontend
);
TEST_ASSERT_NOT_NULL
(
frontend
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:*"
);
bind_loopback_ipv4
(
frontend
,
my_endpoint
,
sizeof
my_endpoint
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
frontend
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
// Backend socket talks to workers over inproc
void
*
backend
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
void
*
backend
=
zmq_socket
(
get_test_context
(),
ZMQ_DEALER
);
assert
(
backend
);
TEST_ASSERT_NOT_NULL
(
backend
);
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"inproc://backend"
));
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_REP
);
void
*
control
=
zmq_socket
(
get_test_context
(),
ZMQ_REP
);
assert
(
control
);
TEST_ASSERT_NOT_NULL
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_connect
(
control
,
"inproc://control_proxy"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control_proxy"
));
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
int
thread_nbr
;
void
*
threads
[
5
];
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx_
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
NULL
);
// Endpoint socket sends random port to avoid test failing when port in use
// Endpoint socket sends random port to avoid test failing when port in use
void
*
endpoint_receivers
[
QT_CLIENTS
];
void
*
endpoint_receivers
[
QT_CLIENTS
];
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
endpoint_receivers
[
i
]
=
zmq_socket
(
ctx_
,
ZMQ_PAIR
);
endpoint_receivers
[
i
]
=
zmq_socket
(
get_test_context
(),
ZMQ_PAIR
);
assert
(
endpoint_receivers
[
i
]);
TEST_ASSERT_NOT_NULL
(
endpoint_receivers
[
i
]);
rc
=
zmq_setsockopt
(
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sizeof
(
linger
));
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
rc
=
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
)
);
}
}
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
rc
=
s_send
(
endpoint_receivers
[
i
],
my_endpoint
);
TEST_ASSERT_SUCCESS_ERRNO
(
s_send
(
endpoint_receivers
[
i
],
my_endpoint
));
assert
(
rc
>
0
);
}
}
// Connect backend to frontend via a proxy
// Connect backend to frontend via a proxy
rc
=
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
)
);
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
zmq_threadclose
(
threads
[
thread_nbr
]);
rc
=
zmq_close
(
frontend
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
frontend
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
backend
));
rc
=
zmq_close
(
backend
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
control
));
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
rc
=
zmq_close
(
endpoint_receivers
[
i
]);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
endpoint_receivers
[
i
]));
assert
(
rc
==
0
);
}
}
}
}
...
@@ -271,25 +261,22 @@ void server_task (void *ctx_)
...
@@ -271,25 +261,22 @@ void server_task (void *ctx_)
// of replies back, with random delays between replies:
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
// The comments in the first column, if suppressed, makes it a poller version
static
void
server_worker
(
void
*
ctx_
)
static
void
server_worker
(
void
*
/*unused_*/
)
{
{
void
*
worker
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
void
*
worker
=
zmq_socket
(
get_test_context
()
,
ZMQ_DEALER
);
assert
(
worker
);
TEST_ASSERT_NOT_NULL
(
worker
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
worker
,
"inproc://backend"
));
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
get_test_context
(),
ZMQ_SUB
);
assert
(
control
);
TEST_ASSERT_NOT_NULL
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
]
=
char
content
[
CONTENT_SIZE_MAX
]
=
{};
// bigger than what we need to check that
{};
// bigger than what we need to check that
...
@@ -299,8 +286,8 @@ static void server_worker (void *ctx_)
...
@@ -299,8 +286,8 @@ static void server_worker (void *ctx_)
bool
run
=
true
;
bool
run
=
true
;
bool
keep_sending
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
while
(
run
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
int
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
if
(
is_verbose
)
if
(
is_verbose
)
...
@@ -315,7 +302,7 @@ static void server_worker (void *ctx_)
...
@@ -315,7 +302,7 @@ static void server_worker (void *ctx_)
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ROUTING_ID_SIZE
)
{
if
(
rc
==
ROUTING_ID_SIZE
)
{
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
rc
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
...
@@ -335,17 +322,15 @@ static void server_worker (void *ctx_)
...
@@ -335,17 +322,15 @@ static void server_worker (void *ctx_)
rc
=
zmq_send
(
worker
,
routing_id
,
ROUTING_ID_SIZE
,
rc
=
zmq_send
(
worker
,
routing_id
,
ROUTING_ID_SIZE
,
ZMQ_SNDMORE
);
ZMQ_SNDMORE
);
assert
(
rc
==
ROUTING_ID_SIZE
);
TEST_ASSERT_EQUAL_INT
(
ROUTING_ID_SIZE
,
rc
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
TEST_ASSERT_EQUAL_INT
(
CONTENT_SIZE
,
rc
);
}
}
}
}
}
}
}
}
rc
=
zmq_close
(
worker
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
worker
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_close
(
control
));
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
}
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
...
@@ -353,19 +338,17 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -353,19 +338,17 @@ uint64_t recv_stat (void *sock_, bool last_)
uint64_t
res
;
uint64_t
res
;
zmq_msg_t
stats_msg
;
zmq_msg_t
stats_msg
;
int
rc
=
zmq_msg_init
(
&
stats_msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
stats_msg
));
assert
(
rc
==
0
);
TEST_ASSERT_EQUAL_INT
(
sizeof
(
uint64_t
),
rc
=
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
);
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
));
assert
(
rc
==
sizeof
(
uint64_t
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
rc
=
zmq_msg_close
(
&
stats_msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
stats_msg
));
assert
(
rc
==
0
);
int
more
;
int
more
;
size_t
moresz
=
sizeof
more
;
size_t
moresz
=
sizeof
more
;
rc
=
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
)
);
assert
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
TEST_ASSERT_TRUE
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
return
res
;
return
res
;
}
}
...
@@ -375,10 +358,8 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -375,10 +358,8 @@ uint64_t recv_stat (void *sock_, bool last_)
void
check_proxy_stats
(
void
*
control_proxy_
)
void
check_proxy_stats
(
void
*
control_proxy_
)
{
{
zmq_proxy_stats_t
total_stats
;
zmq_proxy_stats_t
total_stats
;
int
rc
;
rc
=
zmq_send
(
control_proxy_
,
"STATISTICS"
,
10
,
0
);
send_string_expect_success
(
control_proxy_
,
"STATISTICS"
,
0
);
assert
(
rc
==
10
);
// first frame of the reply contains FRONTEND stats:
// first frame of the reply contains FRONTEND stats:
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
...
@@ -413,91 +394,82 @@ void check_proxy_stats (void *control_proxy_)
...
@@ -413,91 +394,82 @@ void check_proxy_stats (void *control_proxy_)
printf
(
"workers sent out %d replies
\n
"
,
printf
(
"workers sent out %d replies
\n
"
,
zmq_atomic_counter_value
(
g_workers_pkts_out
));
zmq_atomic_counter_value
(
g_workers_pkts_out
));
}
}
assert
(
total_stats
.
frontend
.
msg_in
TEST_ASSERT_EQUAL_UINT
(
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
assert
(
total_stats
.
frontend
.
msg_out
total_stats
.
frontend
.
msg_in
);
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
TEST_ASSERT_EQUAL_UINT
(
assert
(
total_stats
.
backend
.
msg_in
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
total_stats
.
frontend
.
msg_out
);
assert
(
total_stats
.
backend
.
msg_out
TEST_ASSERT_EQUAL_UINT
(
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
total_stats
.
backend
.
msg_in
);
TEST_ASSERT_EQUAL_UINT
(
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
total_stats
.
backend
.
msg_out
);
}
}
// The main thread simply starts several clients and a server, and then
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
// waits for the server to finish.
int
main
(
void
)
void
test_proxy
(
)
{
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
void
*
control
=
test_context_socket
(
ZMQ_PUB
);
assert
(
control
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_bind
(
control
,
"inproc://control"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
control
,
"inproc://control"
));
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control_proxy
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
void
*
control_proxy
=
test_context_socket
(
ZMQ_REQ
);
assert
(
control_proxy
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
);
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
));
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
void
*
threads
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
databags
[
i
].
ctx
=
ctx
;
databags
[
i
].
id
=
i
;
databags
[
i
].
id
=
i
;
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
}
}
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
NULL
);
msleep
(
500
);
// Run for 500 ms then quit
msleep
(
500
);
// Run for 500 ms then quit
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"stopping all clients and server workers
\n
"
);
printf
(
"stopping all clients and server workers
\n
"
);
rc
=
zmq_send
(
control
,
"STOP"
,
4
,
0
);
send_string_expect_success
(
control
,
"STOP"
,
0
);
assert
(
rc
==
4
);
msleep
(
500
);
// Wait for all clients and workers to STOP
msleep
(
500
);
// Wait for all clients and workers to STOP
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"retrieving stats from the proxy
\n
"
);
printf
(
"retrieving stats from the proxy
\n
"
);
check_proxy_stats
(
control_proxy
);
check_proxy_stats
(
control_proxy
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down all clients and server workers
\n
"
);
printf
(
"shutting down all clients and server workers
\n
"
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
send_string_expect_success
(
control
,
"TERMINATE"
,
0
);
assert
(
rc
==
9
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down the proxy
\n
"
);
printf
(
"shutting down the proxy
\n
"
);
rc
=
zmq_send
(
control_proxy
,
"TERMINATE"
,
9
,
0
);
send_string_expect_success
(
control_proxy
,
"TERMINATE"
,
0
);
assert
(
rc
==
9
);
rc
=
zmq_close
(
control
);
test_context_socket_close
(
control
);
assert
(
rc
==
0
);
test_context_socket_close
(
control_proxy
);
rc
=
zmq_close
(
control_proxy
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
zmq_threadclose
(
threads
[
i
]);
}
int
main
(
void
)
{
setup_test_environment
();
rc
=
zmq_ctx_term
(
ctx
);
UNITY_BEGIN
(
);
assert
(
rc
==
0
);
RUN_TEST
(
test_proxy
);
return
0
;
return
UNITY_END
()
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment