Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
50dbd80c
Unverified
Commit
50dbd80c
authored
Aug 24, 2018
by
Luca Boccassi
Committed by
GitHub
Aug 24, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3233 from sigiesec/migrate-tests-to-unity
Revert "Problem: test_proxy not yet using unity"
parents
e0fe7f10
1734a64d
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
192 additions
and
154 deletions
+192
-154
Makefile.am
Makefile.am
+1
-2
test_proxy.cpp
tests/test_proxy.cpp
+191
-152
No files found.
Makefile.am
View file @
50dbd80c
...
@@ -626,8 +626,7 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
...
@@ -626,8 +626,7 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
tests_test_issue_566_LDADD
=
src/libzmq.la
tests_test_issue_566_LDADD
=
src/libzmq.la
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_proxy_LDADD
=
src/libzmq.la
tests_test_proxy_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_proxy_single_socket_SOURCES
=
tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_SOURCES
=
tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD
=
src/libzmq.la
tests_test_proxy_single_socket_LDADD
=
src/libzmq.la
...
...
tests/test_proxy.cpp
View file @
50dbd80c
...
@@ -28,32 +28,21 @@
...
@@ -28,32 +28,21 @@
*/
*/
#include "testutil.hpp"
#include "testutil.hpp"
#include "testutil_unity.hpp"
void
setUp
()
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
{
//
setup_test_context
();
// While this example runs in a single process, that is to make
}
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
void
tearDown
()
// This is our client task
{
// It connects to the server, and then sends a request once per second
teardown_test_context
();
// It collects responses as they arrive, and it prints them out. We will
}
// run several client tasks in parallel, each with a different random ID.
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
#define CONTENT_SIZE 13
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 12
#define CONTENT_SIZE_MAX 32
#define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
#define ROUTING_ID_SIZE_MAX 32
...
@@ -63,6 +52,7 @@ void tearDown ()
...
@@ -63,6 +52,7 @@ void tearDown ()
struct
thread_data
struct
thread_data
{
{
void
*
ctx
;
int
id
;
int
id
;
};
};
...
@@ -86,38 +76,46 @@ void *g_workers_pkts_out = NULL;
...
@@ -86,38 +76,46 @@ void *g_workers_pkts_out = NULL;
static
void
client_task
(
void
*
db_
)
static
void
client_task
(
void
*
db_
)
{
{
struct
thread_data
*
databag
=
static_cast
<
struct
thread_data
*>
(
db_
)
;
struct
thread_data
*
databag
=
(
struct
thread_data
*
)
db_
;
// Endpoint socket gets random port to avoid test failing when port in use
// Endpoint socket gets random port to avoid test failing when port in use
void
*
endpoint
=
test_context_socket
(
ZMQ_PAIR
);
void
*
endpoint
=
zmq_socket
(
databag
->
ctx
,
ZMQ_PAIR
);
assert
(
endpoint
);
int
linger
=
0
;
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
int
rc
=
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
))
);
assert
(
rc
==
0
);
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
endpoint
,
endpoint_source
));
rc
=
zmq_connect
(
endpoint
,
endpoint_source
);
assert
(
rc
==
0
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
TEST_ASSERT_NOT_NULL
(
my_endpoint
);
assert
(
my_endpoint
);
void
*
client
=
test_context_socket
(
ZMQ_DEALER
);
void
*
client
=
zmq_socket
(
databag
->
ctx
,
ZMQ_DEALER
);
assert
(
client
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
databag
->
ctx
,
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
assert
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
char
content
[
CONTENT_SIZE_MAX
];
// Set random routing id to make tracing easier
// Set random routing id to make tracing easier
char
routing_id
[
ROUTING_ID_SIZE
];
char
routing_id
[
ROUTING_ID_SIZE
];
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
rc
=
client
,
ZMQ_ROUTING_ID
,
routing_id
,
zmq_setsockopt
(
client
,
ZMQ_ROUTING_ID
,
routing_id
,
ROUTING_ID_SIZE
));
// includes '\0' as an helper for printf
ROUTING_ID_SIZE
);
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
linger
=
0
;
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
client
,
my_endpoint
));
rc
=
zmq_connect
(
client
,
my_endpoint
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
...
@@ -125,27 +123,27 @@ static void client_task (void *db_)
...
@@ -125,27 +123,27 @@ static void client_task (void *db_)
bool
run
=
true
;
bool
run
=
true
;
bool
keep_sending
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
while
(
run
)
{
for
(
int
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
// Tick once per 200 ms, pulling in arriving messages
int
centitick
;
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
zmq_poll
(
items
,
2
,
10
);
zmq_poll
(
items
,
2
,
10
);
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
size_t
sz
=
sizeof
(
rcvmore
);
TEST_ASSERT_EQUAL_INT
(
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
assert
(
rc
==
CONTENT_SIZE
);
client
,
content
,
CONTENT_SIZE_MAX
,
0
)));
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
printf
(
"client receive - routing_id = %s content = %s
\n
"
,
"client receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
// Check that message is still the same
// Check that message is still the same
TEST_ASSERT_EQUAL_STRING_LEN
(
"request #"
,
content
,
9
);
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
)
);
assert
(
rc
==
0
);
TEST_ASSERT_FALSE
(
rcvmore
);
assert
(
!
rcvmore
);
}
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
int
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
));
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
...
@@ -172,13 +170,17 @@ static void client_task (void *db_)
...
@@ -172,13 +170,17 @@ static void client_task (void *db_)
routing_id
,
request_nbr
);
routing_id
,
request_nbr
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
send_string_expect_success
(
client
,
content
,
0
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
}
test_context_socket_close
(
client
);
rc
=
zmq_close
(
client
);
test_context_socket_close
(
control
);
assert
(
rc
==
0
);
test_context_socket_close
(
endpoint
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
endpoint
);
assert
(
rc
==
0
);
free
(
my_endpoint
);
free
(
my_endpoint
);
}
}
...
@@ -188,64 +190,80 @@ static void client_task (void *db_)
...
@@ -188,64 +190,80 @@ static void client_task (void *db_)
// one request at a time but one client can talk to multiple workers at
// one request at a time but one client can talk to multiple workers at
// once.
// once.
static
void
server_worker
(
void
*
/*dummy_*/
);
static
void
server_worker
(
void
*
ctx_
);
void
server_task
(
void
*
/*dummy_*/
)
void
server_task
(
void
*
ctx_
)
{
{
// Frontend socket talks to clients over TCP
// Frontend socket talks to clients over TCP
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
frontend
=
test_context_socket
(
ZMQ_ROUTER
);
void
*
frontend
=
zmq_socket
(
ctx_
,
ZMQ_ROUTER
);
assert
(
frontend
);
int
linger
=
0
;
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
int
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
bind_loopback_ipv4
(
frontend
,
my_endpoint
,
sizeof
my_endpoint
);
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:*"
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
frontend
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
// Backend socket talks to workers over inproc
void
*
backend
=
test_context_socket
(
ZMQ_DEALER
);
void
*
backend
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
backend
);
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"inproc://backend"
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_REP
);
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_REP
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
control
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control_proxy"
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control_proxy"
);
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
int
thread_nbr
;
void
*
threads
[
5
];
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
NULL
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx_
);
// Endpoint socket sends random port to avoid test failing when port in use
// Endpoint socket sends random port to avoid test failing when port in use
void
*
endpoint_receivers
[
QT_CLIENTS
];
void
*
endpoint_receivers
[
QT_CLIENTS
];
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
endpoint_receivers
[
i
]
=
test_context_socket
(
ZMQ_PAIR
);
endpoint_receivers
[
i
]
=
zmq_socket
(
ctx_
,
ZMQ_PAIR
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
assert
(
endpoint_receivers
[
i
]);
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
);
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
)
);
assert
(
rc
==
0
);
}
}
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
TEST_ASSERT_SUCCESS_ERRNO
(
s_send
(
endpoint_receivers
[
i
],
my_endpoint
));
rc
=
s_send
(
endpoint_receivers
[
i
],
my_endpoint
);
assert
(
rc
>
0
);
}
}
// Connect backend to frontend via a proxy
// Connect backend to frontend via a proxy
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
)
);
assert
(
rc
==
0
);
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
zmq_threadclose
(
threads
[
thread_nbr
]);
test_context_socket_close
(
frontend
);
rc
=
zmq_close
(
frontend
);
test_context_socket_close
(
backend
);
assert
(
rc
==
0
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
backend
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
test_context_socket_close
(
endpoint_receivers
[
i
]);
rc
=
zmq_close
(
endpoint_receivers
[
i
]);
assert
(
rc
==
0
);
}
}
}
}
...
@@ -253,20 +271,25 @@ void server_task (void * /*dummy_*/)
...
@@ -253,20 +271,25 @@ void server_task (void * /*dummy_*/)
// of replies back, with random delays between replies:
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
// The comments in the first column, if suppressed, makes it a poller version
static
void
server_worker
(
void
*
/*dummy_*/
)
static
void
server_worker
(
void
*
ctx_
)
{
{
void
*
worker
=
test_context_socket
(
ZMQ_DEALER
);
void
*
worker
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
assert
(
worker
);
int
linger
=
0
;
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
int
rc
=
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
worker
,
"inproc://backend"
));
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
assert
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
routing_id
[
ROUTING_ID_SIZE_MAX
];
// the size received is the size sent
char
routing_id
[
ROUTING_ID_SIZE_MAX
];
// the size received is the size sent
...
@@ -274,8 +297,8 @@ static void server_worker (void * /*dummy_*/)
...
@@ -274,8 +297,8 @@ static void server_worker (void * /*dummy_*/)
bool
run
=
true
;
bool
run
=
true
;
bool
keep_sending
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
while
(
run
)
{
int
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
if
(
is_verbose
)
if
(
is_verbose
)
...
@@ -289,17 +312,16 @@ static void server_worker (void * /*dummy_*/)
...
@@ -289,17 +312,16 @@ static void server_worker (void * /*dummy_*/)
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ROUTING_ID_SIZE
)
{
if
(
rc
==
ROUTING_ID_SIZE
)
{
TEST_ASSERT_EQUAL_INT
(
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
CONTENT_SIZE
);
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
)));
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
// Send 0..4 replies back
// Send 0..4 replies back
if
(
keep_sending
)
{
if
(
keep_sending
)
{
const
int
replies
=
rand
()
%
5
;
int
reply
,
replies
=
rand
()
%
5
;
for
(
int
reply
=
0
;
reply
<
replies
;
reply
++
)
{
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Sleep for some fraction of a second
// Sleep for some fraction of a second
msleep
(
rand
()
%
10
+
1
);
msleep
(
rand
()
%
10
+
1
);
...
@@ -309,15 +331,19 @@ static void server_worker (void * /*dummy_*/)
...
@@ -309,15 +331,19 @@ static void server_worker (void * /*dummy_*/)
routing_id
);
routing_id
);
zmq_atomic_counter_inc
(
g_workers_pkts_out
);
zmq_atomic_counter_inc
(
g_workers_pkts_out
);
send_string_expect_success
(
worker
,
routing_id
,
rc
=
zmq_send
(
worker
,
routing_id
,
ROUTING_ID_SIZE
,
ZMQ_SNDMORE
);
ZMQ_SNDMORE
);
send_string_expect_success
(
worker
,
content
,
0
);
assert
(
rc
==
ROUTING_ID_SIZE
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
}
}
}
}
}
test_context_socket_close
(
worker
);
rc
=
zmq_close
(
worker
);
test_context_socket_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
}
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
...
@@ -325,18 +351,19 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -325,18 +351,19 @@ uint64_t recv_stat (void *sock_, bool last_)
uint64_t
res
;
uint64_t
res
;
zmq_msg_t
stats_msg
;
zmq_msg_t
stats_msg
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
stats_msg
)
);
int
rc
=
zmq_msg_init
(
&
stats_msg
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
0
);
sizeof
(
uint64_t
),
rc
=
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
)
));
assert
(
rc
==
sizeof
(
uint64_t
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
stats_msg
));
rc
=
zmq_msg_close
(
&
stats_msg
);
assert
(
rc
==
0
);
int
more
;
int
more
;
size_t
moresz
=
sizeof
more
;
size_t
moresz
=
sizeof
more
;
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
)
);
assert
(
rc
==
0
);
TEST_ASSERT_TRUE
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
assert
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
return
res
;
return
res
;
}
}
...
@@ -346,8 +373,10 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -346,8 +373,10 @@ uint64_t recv_stat (void *sock_, bool last_)
void
check_proxy_stats
(
void
*
control_proxy_
)
void
check_proxy_stats
(
void
*
control_proxy_
)
{
{
zmq_proxy_stats_t
total_stats
;
zmq_proxy_stats_t
total_stats
;
int
rc
;
send_string_expect_success
(
control_proxy_
,
"STATISTICS"
,
0
);
rc
=
zmq_send
(
control_proxy_
,
"STATISTICS"
,
10
,
0
);
assert
(
rc
==
10
);
// first frame of the reply contains FRONTEND stats:
// first frame of the reply contains FRONTEND stats:
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
...
@@ -382,54 +411,67 @@ void check_proxy_stats (void *control_proxy_)
...
@@ -382,54 +411,67 @@ void check_proxy_stats (void *control_proxy_)
printf
(
"workers sent out %d replies
\n
"
,
printf
(
"workers sent out %d replies
\n
"
,
zmq_atomic_counter_value
(
g_workers_pkts_out
));
zmq_atomic_counter_value
(
g_workers_pkts_out
));
}
}
TEST_ASSERT_EQUAL_UINT64
(
assert
(
total_stats
.
frontend
.
msg_in
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
total_stats
.
frontend
.
msg_in
);
assert
(
total_stats
.
frontend
.
msg_out
TEST_ASSERT_EQUAL_UINT64
(
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
assert
(
total_stats
.
backend
.
msg_in
total_stats
.
frontend
.
msg_out
);
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
TEST_ASSERT_EQUAL_UINT64
(
assert
(
total_stats
.
backend
.
msg_out
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
total_stats
.
backend
.
msg_in
);
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
total_stats
.
backend
.
msg_out
);
}
}
void
test_proxy
()
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
test_context_socket
(
ZMQ_PUB
);
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
control
);
int
linger
=
0
;
int
linger
=
0
;
TEST_ASSERT_SUCCESS_ERRNO
(
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
control
,
"inproc://control"
));
rc
=
zmq_bind
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control_proxy
=
test_context_socket
(
ZMQ_REQ
);
void
*
control_proxy
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
control_proxy
);
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
));
rc
=
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
);
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
void
*
threads
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
databags
[
i
].
ctx
=
ctx
;
databags
[
i
].
id
=
i
;
databags
[
i
].
id
=
i
;
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
}
}
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
NULL
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
msleep
(
500
);
// Run for 500 ms then quit
msleep
(
500
);
// Run for 500 ms then quit
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"stopping all clients and server workers
\n
"
);
printf
(
"stopping all clients and server workers
\n
"
);
send_string_expect_success
(
control
,
"STOP"
,
0
);
rc
=
zmq_send
(
control
,
"STOP"
,
4
,
0
);
assert
(
rc
==
4
);
msleep
(
500
);
// Wait for all clients and workers to STOP
msleep
(
500
);
// Wait for all clients and workers to STOP
#ifdef ZMQ_BUILD_DRAFT_API
#ifdef ZMQ_BUILD_DRAFT_API
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"retrieving stats from the proxy
\n
"
);
printf
(
"retrieving stats from the proxy
\n
"
);
...
@@ -438,27 +480,24 @@ void test_proxy ()
...
@@ -438,27 +480,24 @@ void test_proxy ()
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down all clients and server workers
\n
"
);
printf
(
"shutting down all clients and server workers
\n
"
);
send_string_expect_success
(
control
,
"TERMINATE"
,
0
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
assert
(
rc
==
9
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down the proxy
\n
"
);
printf
(
"shutting down the proxy
\n
"
);
send_string_expect_success
(
control_proxy
,
"TERMINATE"
,
0
);
rc
=
zmq_send
(
control_proxy
,
"TERMINATE"
,
9
,
0
);
assert
(
rc
==
9
);
test_context_socket_close
(
control
);
test_context_socket_close
(
control_proxy
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control_proxy
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
zmq_threadclose
(
threads
[
i
]);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
setup_test_environment
();
UNITY_BEGIN
(
);
rc
=
zmq_ctx_term
(
ctx
);
RUN_TEST
(
test_proxy
);
assert
(
rc
==
0
);
return
UNITY_END
()
;
return
0
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment