Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
fd27324e
Commit
fd27324e
authored
Aug 23, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: test_proxy not yet using unity
Solution: migrate to unity
parent
76f2edd0
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
152 additions
and
190 deletions
+152
-190
Makefile.am
Makefile.am
+2
-1
test_proxy.cpp
tests/test_proxy.cpp
+150
-189
No files found.
Makefile.am
View file @
fd27324e
...
@@ -623,7 +623,8 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
...
@@ -623,7 +623,8 @@ tests_test_issue_566_SOURCES = tests/test_issue_566.cpp
tests_test_issue_566_LDADD
=
src/libzmq.la
tests_test_issue_566_LDADD
=
src/libzmq.la
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_SOURCES
=
tests/test_proxy.cpp
tests_test_proxy_LDADD
=
src/libzmq.la
tests_test_proxy_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_proxy_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_proxy_single_socket_SOURCES
=
tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_SOURCES
=
tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD
=
src/libzmq.la
tests_test_proxy_single_socket_LDADD
=
src/libzmq.la
...
...
tests/test_proxy.cpp
View file @
fd27324e
...
@@ -28,21 +28,32 @@
...
@@ -28,21 +28,32 @@
*/
*/
#include "testutil.hpp"
#include "testutil.hpp"
#include "testutil_unity.hpp"
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
void
setUp
()
//
{
// While this example runs in a single process, that is to make
setup_test_context
();
// it easier to start and stop the example. Each task may have its own
}
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
void
tearDown
()
// It connects to the server, and then sends a request once per second
{
// It collects responses as they arrive, and it prints them out. We will
teardown_test_context
();
// run several client tasks in parallel, each with a different random ID.
}
#define CONTENT_SIZE 13
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 12
#define CONTENT_SIZE_MAX 32
#define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
#define ROUTING_ID_SIZE_MAX 32
...
@@ -52,7 +63,6 @@
...
@@ -52,7 +63,6 @@
struct
thread_data
struct
thread_data
{
{
void
*
ctx
;
int
id
;
int
id
;
};
};
...
@@ -76,46 +86,38 @@ void *g_workers_pkts_out = NULL;
...
@@ -76,46 +86,38 @@ void *g_workers_pkts_out = NULL;
static
void
client_task
(
void
*
db_
)
static
void
client_task
(
void
*
db_
)
{
{
struct
thread_data
*
databag
=
(
struct
thread_data
*
)
db_
;
struct
thread_data
*
databag
=
static_cast
<
struct
thread_data
*>
(
db_
)
;
// Endpoint socket gets random port to avoid test failing when port in use
// Endpoint socket gets random port to avoid test failing when port in use
void
*
endpoint
=
zmq_socket
(
databag
->
ctx
,
ZMQ_PAIR
);
void
*
endpoint
=
test_context_socket
(
ZMQ_PAIR
);
assert
(
endpoint
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
endpoint
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
))
);
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
databag
->
id
);
rc
=
zmq_connect
(
endpoint
,
endpoint_source
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
endpoint
,
endpoint_source
));
assert
(
rc
==
0
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
char
*
my_endpoint
=
s_recv
(
endpoint
);
assert
(
my_endpoint
);
TEST_ASSERT_NOT_NULL
(
my_endpoint
);
void
*
client
=
zmq_socket
(
databag
->
ctx
,
ZMQ_DEALER
);
void
*
client
=
test_context_socket
(
ZMQ_DEALER
);
assert
(
client
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
databag
->
ctx
,
ZMQ_SUB
);
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
assert
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
char
content
[
CONTENT_SIZE_MAX
];
// Set random routing id to make tracing easier
// Set random routing id to make tracing easier
char
routing_id
[
ROUTING_ID_SIZE
];
char
routing_id
[
ROUTING_ID_SIZE
];
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
sprintf
(
routing_id
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
zmq_setsockopt
(
client
,
ZMQ_ROUTING_ID
,
routing_id
,
client
,
ZMQ_ROUTING_ID
,
routing_id
,
ROUTING_ID_SIZE
);
// includes '\0' as an helper for printf
ROUTING_ID_SIZE
));
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
linger
=
0
;
linger
=
0
;
rc
=
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
client
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_connect
(
client
,
my_endpoint
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
client
,
my_endpoint
));
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
zmq_pollitem_t
items
[]
=
{{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
{
control
,
0
,
ZMQ_POLLIN
,
0
}};
...
@@ -123,27 +125,27 @@ static void client_task (void *db_)
...
@@ -123,27 +125,27 @@ static void client_task (void *db_)
bool
run
=
true
;
bool
run
=
true
;
bool
keep_sending
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
while
(
run
)
{
// Tick once per 200 ms, pulling in arriving messages
for
(
int
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
int
centitick
;
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
zmq_poll
(
items
,
2
,
10
);
zmq_poll
(
items
,
2
,
10
);
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
CONTENT_SIZE
);
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
)));
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
printf
(
"client receive - routing_id = %s content = %s
\n
"
,
"client receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
// Check that message is still the same
// Check that message is still the same
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
TEST_ASSERT_EQUAL_STRING_LEN
(
"request #"
,
content
,
9
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
)
);
assert
(
!
rcvmore
);
TEST_ASSERT_FALSE
(
rcvmore
);
}
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
int
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
));
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
...
@@ -170,17 +172,13 @@ static void client_task (void *db_)
...
@@ -170,17 +172,13 @@ static void client_task (void *db_)
routing_id
,
request_nbr
);
routing_id
,
request_nbr
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
zmq_atomic_counter_inc
(
g_clients_pkts_out
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
send_string_expect_success
(
client
,
content
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
}
rc
=
zmq_close
(
client
);
test_context_socket_close
(
client
);
assert
(
rc
==
0
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
control
);
test_context_socket_close
(
endpoint
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
endpoint
);
assert
(
rc
==
0
);
free
(
my_endpoint
);
free
(
my_endpoint
);
}
}
...
@@ -190,80 +188,64 @@ static void client_task (void *db_)
...
@@ -190,80 +188,64 @@ static void client_task (void *db_)
// one request at a time but one client can talk to multiple workers at
// one request at a time but one client can talk to multiple workers at
// once.
// once.
static
void
server_worker
(
void
*
ctx_
);
static
void
server_worker
(
void
*
/*dummy_*/
);
void
server_task
(
void
*
ctx_
)
void
server_task
(
void
*
/*dummy_*/
)
{
{
// Frontend socket talks to clients over TCP
// Frontend socket talks to clients over TCP
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
frontend
=
zmq_socket
(
ctx_
,
ZMQ_ROUTER
);
void
*
frontend
=
test_context_socket
(
ZMQ_ROUTER
);
assert
(
frontend
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:*"
);
bind_loopback_ipv4
(
frontend
,
my_endpoint
,
sizeof
my_endpoint
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
frontend
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
// Backend socket talks to workers over inproc
void
*
backend
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
void
*
backend
=
test_context_socket
(
ZMQ_DEALER
);
assert
(
backend
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"inproc://backend"
));
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_REP
);
void
*
control
=
test_context_socket
(
ZMQ_REP
);
assert
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control_proxy"
));
rc
=
zmq_connect
(
control
,
"inproc://control_proxy"
);
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
int
thread_nbr
;
void
*
threads
[
5
];
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx_
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
NULL
);
// Endpoint socket sends random port to avoid test failing when port in use
// Endpoint socket sends random port to avoid test failing when port in use
void
*
endpoint_receivers
[
QT_CLIENTS
];
void
*
endpoint_receivers
[
QT_CLIENTS
];
char
endpoint_source
[
256
];
char
endpoint_source
[
256
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
endpoint_receivers
[
i
]
=
zmq_socket
(
ctx_
,
ZMQ_PAIR
);
endpoint_receivers
[
i
]
=
test_context_socket
(
ZMQ_PAIR
);
assert
(
endpoint_receivers
[
i
]);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
rc
=
zmq_setsockopt
(
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
endpoint_receivers
[
i
],
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
sizeof
(
linger
));
assert
(
rc
==
0
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
sprintf
(
endpoint_source
,
"inproc://endpoint%d"
,
i
);
rc
=
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_bind
(
endpoint_receivers
[
i
],
endpoint_source
)
);
}
}
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
rc
=
s_send
(
endpoint_receivers
[
i
],
my_endpoint
);
TEST_ASSERT_SUCCESS_ERRNO
(
s_send
(
endpoint_receivers
[
i
],
my_endpoint
));
assert
(
rc
>
0
);
}
}
// Connect backend to frontend via a proxy
// Connect backend to frontend via a proxy
rc
=
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
)
);
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
zmq_threadclose
(
threads
[
thread_nbr
]);
rc
=
zmq_close
(
frontend
);
test_context_socket_close
(
frontend
);
assert
(
rc
==
0
);
test_context_socket_close
(
backend
);
rc
=
zmq_close
(
backend
);
test_context_socket_close
(
control
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
++
i
)
{
rc
=
zmq_close
(
endpoint_receivers
[
i
]);
test_context_socket_close
(
endpoint_receivers
[
i
]);
assert
(
rc
==
0
);
}
}
}
}
...
@@ -271,25 +253,20 @@ void server_task (void *ctx_)
...
@@ -271,25 +253,20 @@ void server_task (void *ctx_)
// of replies back, with random delays between replies:
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
// The comments in the first column, if suppressed, makes it a poller version
static
void
server_worker
(
void
*
ctx_
)
static
void
server_worker
(
void
*
/*dummy_*/
)
{
{
void
*
worker
=
zmq_socket
(
ctx_
,
ZMQ_DEALER
);
void
*
worker
=
test_context_socket
(
ZMQ_DEALER
);
assert
(
worker
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
worker
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
worker
,
"inproc://backend"
));
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
void
*
control
=
test_context_socket
(
ZMQ_SUB
);
assert
(
control
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
));
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
control
,
"inproc://control"
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
routing_id
[
ROUTING_ID_SIZE_MAX
];
// the size received is the size sent
char
routing_id
[
ROUTING_ID_SIZE_MAX
];
// the size received is the size sent
...
@@ -297,7 +274,7 @@ static void server_worker (void *ctx_)
...
@@ -297,7 +274,7 @@ static void server_worker (void *ctx_)
bool
run
=
true
;
bool
run
=
true
;
bool
keep_sending
=
true
;
bool
keep_sending
=
true
;
while
(
run
)
{
while
(
run
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
int
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
content
[
rc
]
=
0
;
// NULL-terminate the command string
content
[
rc
]
=
0
;
// NULL-terminate the command string
...
@@ -312,16 +289,17 @@ static void server_worker (void *ctx_)
...
@@ -312,16 +289,17 @@ static void server_worker (void *ctx_)
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
rc
=
zmq_recv
(
worker
,
routing_id
,
ROUTING_ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ROUTING_ID_SIZE
)
{
if
(
rc
==
ROUTING_ID_SIZE
)
{
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
TEST_ASSERT_EQUAL_INT
(
assert
(
rc
==
CONTENT_SIZE
);
CONTENT_SIZE
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
)));
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
printf
(
"server receive - routing_id = %s content = %s
\n
"
,
routing_id
,
content
);
routing_id
,
content
);
// Send 0..4 replies back
// Send 0..4 replies back
if
(
keep_sending
)
{
if
(
keep_sending
)
{
int
reply
,
replies
=
rand
()
%
5
;
const
int
replies
=
rand
()
%
5
;
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
for
(
int
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Sleep for some fraction of a second
// Sleep for some fraction of a second
msleep
(
rand
()
%
10
+
1
);
msleep
(
rand
()
%
10
+
1
);
...
@@ -331,19 +309,15 @@ static void server_worker (void *ctx_)
...
@@ -331,19 +309,15 @@ static void server_worker (void *ctx_)
routing_id
);
routing_id
);
zmq_atomic_counter_inc
(
g_workers_pkts_out
);
zmq_atomic_counter_inc
(
g_workers_pkts_out
);
rc
=
zmq_send
(
worker
,
routing_id
,
ROUTING_ID_SIZE
,
send_string_expect_success
(
worker
,
routing_id
,
ZMQ_SNDMORE
);
ZMQ_SNDMORE
);
assert
(
rc
==
ROUTING_ID_SIZE
);
send_string_expect_success
(
worker
,
content
,
0
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
}
}
}
}
}
rc
=
zmq_close
(
worker
);
test_context_socket_close
(
worker
);
assert
(
rc
==
0
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
}
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
uint64_t
recv_stat
(
void
*
sock_
,
bool
last_
)
...
@@ -351,19 +325,18 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -351,19 +325,18 @@ uint64_t recv_stat (void *sock_, bool last_)
uint64_t
res
;
uint64_t
res
;
zmq_msg_t
stats_msg
;
zmq_msg_t
stats_msg
;
int
rc
=
zmq_msg_init
(
&
stats_msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
stats_msg
)
);
assert
(
rc
==
0
);
TEST_ASSERT_EQUAL_INT
(
rc
=
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
);
sizeof
(
uint64_t
),
assert
(
rc
==
sizeof
(
uint64_t
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recvmsg
(
sock_
,
&
stats_msg
,
0
)
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
memcpy
(
&
res
,
zmq_msg_data
(
&
stats_msg
),
zmq_msg_size
(
&
stats_msg
));
rc
=
zmq_msg_close
(
&
stats_msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_close
(
&
stats_msg
));
assert
(
rc
==
0
);
int
more
;
int
more
;
size_t
moresz
=
sizeof
more
;
size_t
moresz
=
sizeof
more
;
rc
=
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_getsockopt
(
sock_
,
ZMQ_RCVMORE
,
&
more
,
&
moresz
)
);
assert
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
TEST_ASSERT_TRUE
((
last_
&&
!
more
)
||
(
!
last_
&&
more
));
return
res
;
return
res
;
}
}
...
@@ -373,10 +346,8 @@ uint64_t recv_stat (void *sock_, bool last_)
...
@@ -373,10 +346,8 @@ uint64_t recv_stat (void *sock_, bool last_)
void
check_proxy_stats
(
void
*
control_proxy_
)
void
check_proxy_stats
(
void
*
control_proxy_
)
{
{
zmq_proxy_stats_t
total_stats
;
zmq_proxy_stats_t
total_stats
;
int
rc
;
rc
=
zmq_send
(
control_proxy_
,
"STATISTICS"
,
10
,
0
);
send_string_expect_success
(
control_proxy_
,
"STATISTICS"
,
0
);
assert
(
rc
==
10
);
// first frame of the reply contains FRONTEND stats:
// first frame of the reply contains FRONTEND stats:
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
total_stats
.
frontend
.
msg_in
=
recv_stat
(
control_proxy_
,
false
);
...
@@ -411,67 +382,54 @@ void check_proxy_stats (void *control_proxy_)
...
@@ -411,67 +382,54 @@ void check_proxy_stats (void *control_proxy_)
printf
(
"workers sent out %d replies
\n
"
,
printf
(
"workers sent out %d replies
\n
"
,
zmq_atomic_counter_value
(
g_workers_pkts_out
));
zmq_atomic_counter_value
(
g_workers_pkts_out
));
}
}
assert
(
total_stats
.
frontend
.
msg_in
TEST_ASSERT_EQUAL_UINT64
(
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
assert
(
total_stats
.
frontend
.
msg_out
total_stats
.
frontend
.
msg_in
);
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
TEST_ASSERT_EQUAL_UINT64
(
assert
(
total_stats
.
backend
.
msg_in
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
==
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
));
total_stats
.
frontend
.
msg_out
);
assert
(
total_stats
.
backend
.
msg_out
TEST_ASSERT_EQUAL_UINT64
(
==
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
));
(
unsigned
)
zmq_atomic_counter_value
(
g_workers_pkts_out
),
total_stats
.
backend
.
msg_in
);
TEST_ASSERT_EQUAL_UINT64
(
(
unsigned
)
zmq_atomic_counter_value
(
g_clients_pkts_out
),
total_stats
.
backend
.
msg_out
);
}
}
void
test_proxy
()
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_clients_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
g_workers_pkts_out
=
zmq_atomic_counter_new
();
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
void
*
control
=
test_context_socket
(
ZMQ_PUB
);
assert
(
control
);
int
linger
=
0
;
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
control
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
rc
=
zmq_bind
(
control
,
"inproc://control"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
control
,
"inproc://control"
));
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
// Control socket receives terminate command from main over inproc
void
*
control_proxy
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
void
*
control_proxy
=
test_context_socket
(
ZMQ_REQ
);
assert
(
control_proxy
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
zmq_setsockopt
(
control_proxy
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
rc
=
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
);
zmq_bind
(
control_proxy
,
"inproc://control_proxy"
));
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
void
*
threads
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
struct
thread_data
databags
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
{
databags
[
i
].
ctx
=
ctx
;
databags
[
i
].
id
=
i
;
databags
[
i
].
id
=
i
;
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
&
databags
[
i
]);
}
}
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
NULL
);
msleep
(
500
);
// Run for 500 ms then quit
msleep
(
500
);
// Run for 500 ms then quit
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"stopping all clients and server workers
\n
"
);
printf
(
"stopping all clients and server workers
\n
"
);
rc
=
zmq_send
(
control
,
"STOP"
,
4
,
0
);
send_string_expect_success
(
control
,
"STOP"
,
0
);
assert
(
rc
==
4
);
msleep
(
500
);
// Wait for all clients and workers to STOP
msleep
(
500
);
// Wait for all clients and workers to STOP
#ifdef ZMQ_BUILD_DRAFT_API
#ifdef ZMQ_BUILD_DRAFT_API
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"retrieving stats from the proxy
\n
"
);
printf
(
"retrieving stats from the proxy
\n
"
);
...
@@ -480,24 +438,27 @@ int main (void)
...
@@ -480,24 +438,27 @@ int main (void)
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down all clients and server workers
\n
"
);
printf
(
"shutting down all clients and server workers
\n
"
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
send_string_expect_success
(
control
,
"TERMINATE"
,
0
);
assert
(
rc
==
9
);
if
(
is_verbose
)
if
(
is_verbose
)
printf
(
"shutting down the proxy
\n
"
);
printf
(
"shutting down the proxy
\n
"
);
rc
=
zmq_send
(
control_proxy
,
"TERMINATE"
,
9
,
0
);
send_string_expect_success
(
control_proxy
,
"TERMINATE"
,
0
);
assert
(
rc
==
9
);
test_context_socket_close
(
control
);
rc
=
zmq_close
(
control
);
test_context_socket_close
(
control_proxy
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control_proxy
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
zmq_threadclose
(
threads
[
i
]);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
setup_test_environment
();
rc
=
zmq_ctx_term
(
ctx
);
UNITY_BEGIN
(
);
assert
(
rc
==
0
);
RUN_TEST
(
test_proxy
);
return
0
;
return
UNITY_END
()
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment