Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
713f075f
Commit
713f075f
authored
Aug 20, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: test_xpub_manual not yet using unity
Solution: migrate to unity
parent
407bd3b1
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
238 additions
and
351 deletions
+238
-351
Makefile.am
Makefile.am
+2
-1
test_xpub_manual.cpp
tests/test_xpub_manual.cpp
+210
-350
testutil_unity.hpp
tests/testutil_unity.hpp
+26
-0
No files found.
Makefile.am
View file @
713f075f
...
...
@@ -657,7 +657,8 @@ tests_test_xpub_nodrop_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_nodrop_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_xpub_manual_SOURCES
=
tests/test_xpub_manual.cpp
tests_test_xpub_manual_LDADD
=
src/libzmq.la
tests_test_xpub_manual_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_xpub_manual_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_xpub_welcome_msg_SOURCES
=
tests/test_xpub_welcome_msg.cpp
tests_test_xpub_welcome_msg_LDADD
=
src/libzmq.la
...
...
tests/test_xpub_manual.cpp
View file @
713f075f
...
...
@@ -28,570 +28,430 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int
test_basic
()
void
setUp
()
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
setup_test_context
();
}
void
tearDown
()
{
teardown_test_context
();
}
void
test_basic
()
{
// Create a publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
assert
(
pub
);
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
int
manual
=
1
;
int
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
"inproc://soname"
));
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
assert
(
rc
==
0
);
void
*
sub
=
test_context_socket
(
ZMQ_XSUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
"inproc://soname"
));
// Subscribe for A
char
subscription
[
2
]
=
{
1
,
'A'
};
rc
=
zmq_send_const
(
sub
,
subscription
,
2
,
0
);
assert
(
rc
==
2
);
char
buffer
[
2
];
const
char
subscription
[]
=
{
1
,
'A'
,
0
};
send_string_expect_success
(
sub
,
subscription
,
0
);
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
recv_string_expect_success
(
pub
,
subscription
,
0
);
// Subscribe socket for B instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"B"
,
1
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"B"
,
1
));
// Sending A message and B Message
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
assert
(
rc
==
1
);
rc
=
zmq_send_const
(
pub
,
"B"
,
1
,
0
);
assert
(
rc
==
1
);
send_string_expect_success
(
pub
,
"A"
,
0
);
send_string_expect_success
(
pub
,
"B"
,
0
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
assert
(
rc
==
1
);
assert
(
buffer
[
0
]
==
'B'
);
recv_string_expect_success
(
sub
,
"B"
,
ZMQ_DONTWAIT
);
// Clean up.
rc
=
zmq_close
(
pub
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
sub
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
test_context_socket_close
(
pub
);
test_context_socket_close
(
sub
);
}
int
test_unsubscribe_manual
()
void
test_unsubscribe_manual
()
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// Create a publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
assert
(
pub
);
int
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
assert
(
rc
==
0
);
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
"inproc://soname"
));
// set pub socket options
int
manual
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
sizeof
(
manual
))
);
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
assert
(
rc
==
0
);
void
*
sub
=
test_context_socket
(
ZMQ_XSUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
"inproc://soname"
));
// Subscribe for A
char
subscription1
[
2
]
=
{
1
,
'A'
};
rc
=
zmq_send_const
(
sub
,
subscription1
,
2
,
0
);
assert
(
rc
==
2
);
const
uint8_t
subscription1
[]
=
{
1
,
'A'
};
send_array_expect_success
(
sub
,
subscription1
,
0
);
// Subscribe for B
char
subscription2
[
2
]
=
{
1
,
'B'
};
rc
=
zmq_send_const
(
sub
,
subscription2
,
2
,
0
);
assert
(
rc
==
2
);
const
uint8_t
subscription2
[]
=
{
1
,
'B'
};
send_array_expect_success
(
sub
,
subscription2
,
0
);
char
buffer
[
3
];
// Receive subscription "A" from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
recv_array_expect_success
(
pub
,
subscription1
,
0
);
// Subscribe socket for XA instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XA"
,
2
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XA"
,
2
));
// Receive subscription "B" from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'B'
);
recv_array_expect_success
(
pub
,
subscription2
,
0
);
// Subscribe socket for XB instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XB"
,
2
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XB"
,
2
));
// Unsubscribe from A
char
unsubscription1
[
2
]
=
{
0
,
'A'
};
rc
=
zmq_send_const
(
sub
,
unsubscription1
,
2
,
0
);
assert
(
rc
==
2
);
const
uint8_t
unsubscription1
[
2
]
=
{
0
,
'A'
};
send_array_expect_success
(
sub
,
unsubscription1
,
0
);
// Receive unsubscription "A" from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
recv_array_expect_success
(
pub
,
unsubscription1
,
0
);
// Unsubscribe socket from XA instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XA"
,
2
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XA"
,
2
));
// Sending messages XA, XB
rc
=
zmq_send_const
(
pub
,
"XA"
,
2
,
0
);
assert
(
rc
==
2
);
rc
=
zmq_send_const
(
pub
,
"XB"
,
2
,
0
);
assert
(
rc
==
2
);
send_string_expect_success
(
pub
,
"XA"
,
0
);
send_string_expect_success
(
pub
,
"XB"
,
0
);
// Subscriber should receive XB only
rc
=
zmq_recv
(
sub
,
buffer
,
2
,
ZMQ_DONTWAIT
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
'X'
);
assert
(
buffer
[
1
]
==
'B'
);
recv_string_expect_success
(
sub
,
"XB"
,
ZMQ_DONTWAIT
);
// Close subscriber
rc
=
zmq_close
(
sub
);
assert
(
rc
==
0
);
test_context_socket_close
(
sub
);
// Receive unsubscription "B"
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'B'
);
const
char
unsubscription2
[
2
]
=
{
0
,
'B'
};
TEST_ASSERT_EQUAL_INT
(
sizeof
unsubscription2
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
pub
,
buffer
,
sizeof
buffer
,
0
)));
TEST_ASSERT_EQUAL_INT8_ARRAY
(
unsubscription2
,
buffer
,
sizeof
unsubscription2
);
// Unsubscribe socket from XB instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XB"
,
2
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XB"
,
2
));
// Clean up.
rc
=
zmq_close
(
pub
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
test_context_socket_close
(
pub
);
}
int
test_xpub_proxy_unsubscribe_on_disconnect
(
void
)
void
test_xpub_proxy_unsubscribe_on_disconnect
()
{
const
char
*
topic
=
"1"
;
const
char
*
payload
=
"X"
;
const
uint8_t
topic_buff
[]
=
{
"1"
}
;
const
uint8_t
payload_buff
[]
=
{
"X"
}
;
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint_backend
[
MAX_SOCKET_STRING
];
char
my_endpoint_frontend
[
MAX_SOCKET_STRING
];
int
manual
=
1
;
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// proxy frontend
void
*
xsub_proxy
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
xsub_proxy
);
assert
(
zmq_bind
(
xsub_proxy
,
"tcp://127.0.0.1:*"
)
==
0
);
int
rc
=
zmq_getsockopt
(
xsub_proxy
,
ZMQ_LAST_ENDPOINT
,
my_endpoint_frontend
,
&
len
);
assert
(
rc
==
0
);
void
*
xsub_proxy
=
test_context_socket
(
ZMQ_XSUB
);
bind_loopback_ipv4
(
xsub_proxy
,
my_endpoint_frontend
,
sizeof
my_endpoint_frontend
);
// proxy backend
void
*
xpub_proxy
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
assert
(
xpub_proxy
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
)
==
0
);
assert
(
zmq_bind
(
xpub_proxy
,
"tcp://127.0.0.1:*"
)
==
0
);
len
=
MAX_SOCKET_STRING
;
rc
=
zmq_getsockopt
(
xpub_proxy
,
ZMQ_LAST_ENDPOINT
,
my_endpoint_backend
,
&
len
);
assert
(
rc
==
0
);
void
*
xpub_proxy
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
));
bind_loopback_ipv4
(
xpub_proxy
,
my_endpoint_backend
,
sizeof
my_endpoint_backend
);
// publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
zmq_connect
(
pub
,
my_endpoint_frontend
)
==
0
);
void
*
pub
=
test_context_socket
(
ZMQ_PUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pub
,
my_endpoint_frontend
)
);
// first subscriber subscribes
void
*
sub1
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub1
);
assert
(
zmq_connect
(
sub1
,
my_endpoint_backend
)
==
0
);
assert
(
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic
,
1
)
==
0
);
void
*
sub1
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub1
,
my_endpoint_backend
)
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic_buff
,
1
)
);
// wait
msleep
(
SETTLE_TIME
);
// proxy reroutes and confirms subscriptions
char
sub_buff
[
2
];
assert
(
zmq_recv
(
xpub_proxy
,
sub_buff
,
2
,
ZMQ_DONTWAIT
)
==
2
);
assert
(
sub_buff
[
0
]
==
1
);
assert
(
sub_buff
[
1
]
==
*
topic
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
sub_buff
,
2
,
0
)
==
2
);
const
uint8_t
subscription
[
2
]
=
{
1
,
*
topic_buff
};
recv_array_expect_success
(
xpub_proxy
,
subscription
,
ZMQ_DONTWAIT
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic_buff
,
1
));
send_array_expect_success
(
xsub_proxy
,
subscription
,
0
);
// second subscriber subscribes
void
*
sub2
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub2
);
assert
(
zmq_connect
(
sub2
,
my_endpoint_backend
)
==
0
);
assert
(
zmq_setsockopt
(
sub2
,
ZMQ_SUBSCRIBE
,
topic
,
1
)
==
0
);
void
*
sub2
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub2
,
my_endpoint_backend
)
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub2
,
ZMQ_SUBSCRIBE
,
topic_buff
,
1
)
);
// wait
msleep
(
SETTLE_TIME
);
// proxy reroutes
assert
(
zmq_recv
(
xpub_proxy
,
sub_buff
,
2
,
ZMQ_DONTWAIT
)
==
2
);
assert
(
sub_buff
[
0
]
==
1
);
assert
(
sub_buff
[
1
]
==
*
topic
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
sub_buff
,
2
,
0
)
==
2
);
recv_array_expect_success
(
xpub_proxy
,
subscription
,
ZMQ_DONTWAIT
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic_buff
,
1
));
send_array_expect_success
(
xsub_proxy
,
subscription
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// let publisher send a msg
assert
(
zmq_send
(
pub
,
topic
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
pub
,
payload
,
1
,
0
)
==
1
);
send_array_expect_success
(
pub
,
topic_buff
,
ZMQ_SNDMORE
);
send_array_expect_success
(
pub
,
payload_buff
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// proxy reroutes data messages to subscribers
char
topic_buff
[
1
];
char
data_buff
[
1
];
assert
(
zmq_recv
(
xsub_proxy
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic
);
assert
(
zmq_recv
(
xsub_proxy
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
assert
(
zmq_send
(
xpub_proxy
,
topic_buff
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
xpub_proxy
,
data_buff
,
1
,
0
)
==
1
);
recv_array_expect_success
(
xsub_proxy
,
topic_buff
,
ZMQ_DONTWAIT
);
recv_array_expect_success
(
xsub_proxy
,
payload_buff
,
ZMQ_DONTWAIT
);
send_array_expect_success
(
xpub_proxy
,
topic_buff
,
ZMQ_SNDMORE
);
send_array_expect_success
(
xpub_proxy
,
payload_buff
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// each subscriber should now get a message
assert
(
zmq_recv
(
sub2
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic
);
assert
(
zmq_recv
(
sub2
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
recv_array_expect_success
(
sub2
,
topic_buff
,
ZMQ_DONTWAIT
);
recv_array_expect_success
(
sub2
,
payload_buff
,
ZMQ_DONTWAIT
);
assert
(
zmq_recv
(
sub1
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic
);
assert
(
zmq_recv
(
sub1
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
recv_array_expect_success
(
sub1
,
topic_buff
,
ZMQ_DONTWAIT
);
recv_array_expect_success
(
sub1
,
payload_buff
,
ZMQ_DONTWAIT
);
// Disconnect both subscribers
assert
(
zmq_close
(
sub1
)
==
0
);
assert
(
zmq_close
(
sub2
)
==
0
);
test_context_socket_close
(
sub1
);
test_context_socket_close
(
sub2
);
// wait
msleep
(
SETTLE_TIME
);
// unsubscribe messages are passed from proxy to publisher
assert
(
zmq_recv
(
xpub_proxy
,
sub_buff
,
2
,
0
)
==
2
)
;
assert
(
sub_buff
[
0
]
==
0
);
assert
(
sub_buff
[
1
]
==
*
topic
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_UNSUBSCRIBE
,
topic
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
sub_buff
,
2
,
0
)
==
2
);
const
uint8_t
unsubscription
[]
=
{
0
,
*
topic_buff
}
;
recv_array_expect_success
(
xpub_proxy
,
unsubscription
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_UNSUBSCRIBE
,
topic_buff
,
1
)
);
send_array_expect_success
(
xsub_proxy
,
unsubscription
,
0
);
// should receive another unsubscribe msg
assert
(
zmq_recv
(
xpub_proxy
,
sub_buff
,
2
,
0
)
==
2
&&
"Should receive the second unsubscribe message."
);
assert
(
sub_buff
[
0
]
==
0
);
assert
(
sub_buff
[
1
]
==
*
topic
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_UNSUBSCRIBE
,
topic
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
sub_buff
,
2
,
0
)
==
2
);
recv_array_expect_success
(
xpub_proxy
,
unsubscription
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_UNSUBSCRIBE
,
topic_buff
,
1
));
send_array_expect_success
(
xsub_proxy
,
unsubscription
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// let publisher send a msg
assert
(
zmq_send
(
pub
,
topic
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
pub
,
payload
,
1
,
0
)
==
1
);
send_array_expect_success
(
pub
,
topic_buff
,
ZMQ_SNDMORE
);
send_array_expect_success
(
pub
,
payload_buff
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// nothing should come to the proxy
assert
(
zmq_recv
(
xsub_proxy
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
-
1
);
assert
(
errno
==
EAGAIN
);
assert
(
zmq_close
(
pub
)
==
0
);
assert
(
zmq_close
(
xpub_proxy
)
==
0
);
assert
(
zmq_close
(
xsub_proxy
)
==
0
);
assert
(
zmq_ctx_term
(
ctx
)
==
0
);
char
buffer
[
1
];
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
xsub_proxy
,
buffer
,
sizeof
buffer
,
ZMQ_DONTWAIT
));
return
0
;
test_context_socket_close
(
pub
);
test_context_socket_close
(
xpub_proxy
);
test_context_socket_close
(
xsub_proxy
);
}
int
test_missing_subscriptions
(
void
)
void
test_missing_subscriptions
(
)
{
const
char
*
topic1
=
"1"
;
const
char
*
topic2
=
"2"
;
const
char
*
payload
=
"X"
;
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint_backend
[
MAX_SOCKET_STRING
];
char
my_endpoint_frontend
[
MAX_SOCKET_STRING
];
int
manual
=
1
;
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// proxy frontend
void
*
xsub_proxy
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
xsub_proxy
);
assert
(
zmq_bind
(
xsub_proxy
,
"tcp://127.0.0.1:*"
)
==
0
);
int
rc
=
zmq_getsockopt
(
xsub_proxy
,
ZMQ_LAST_ENDPOINT
,
my_endpoint_frontend
,
&
len
);
assert
(
rc
==
0
);
void
*
xsub_proxy
=
test_context_socket
(
ZMQ_XSUB
);
bind_loopback_ipv4
(
xsub_proxy
,
my_endpoint_frontend
,
sizeof
my_endpoint_frontend
);
// proxy backend
void
*
xpub_proxy
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
assert
(
xpub_proxy
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
)
==
0
);
assert
(
zmq_bind
(
xpub_proxy
,
"tcp://127.0.0.1:*"
)
==
0
);
len
=
MAX_SOCKET_STRING
;
rc
=
zmq_getsockopt
(
xpub_proxy
,
ZMQ_LAST_ENDPOINT
,
my_endpoint_backend
,
&
len
);
assert
(
rc
==
0
);
void
*
xpub_proxy
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
));
bind_loopback_ipv4
(
xpub_proxy
,
my_endpoint_backend
,
sizeof
my_endpoint_backend
);
// publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
zmq_connect
(
pub
,
my_endpoint_frontend
)
==
0
);
void
*
pub
=
test_context_socket
(
ZMQ_PUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pub
,
my_endpoint_frontend
)
);
// Here's the problem: because subscribers subscribe in quick succession,
// the proxy is unable to confirm the first subscription before receiving
// the second. This causes the first subscription to get lost.
// first subscriber
void
*
sub1
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub1
);
assert
(
zmq_connect
(
sub1
,
my_endpoint_backend
)
==
0
);
assert
(
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic1
,
1
)
==
0
);
void
*
sub1
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub1
,
my_endpoint_backend
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic1
,
1
));
// second subscriber
void
*
sub2
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub2
);
assert
(
zmq_connect
(
sub2
,
my_endpoint_backend
)
==
0
);
assert
(
zmq_setsockopt
(
sub2
,
ZMQ_SUBSCRIBE
,
topic2
,
1
)
==
0
);
void
*
sub2
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub2
,
my_endpoint_backend
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub2
,
ZMQ_SUBSCRIBE
,
topic2
,
1
));
// wait
msleep
(
SETTLE_TIME
);
// proxy now reroutes and confirms subscriptions
char
buffer
[
2
];
assert
(
zmq_recv
(
xpub_proxy
,
buffer
,
2
,
ZMQ_DONTWAIT
)
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
*
topic1
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic1
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
buffer
,
2
,
0
)
==
2
);
assert
(
zmq_recv
(
xpub_proxy
,
buffer
,
2
,
ZMQ_DONTWAIT
)
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
*
topic2
);
assert
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic2
,
1
)
==
0
);
assert
(
zmq_send
(
xsub_proxy
,
buffer
,
2
,
0
)
==
2
);
const
uint8_t
subscription1
[]
=
{
1
,
static_cast
<
uint8_t
>
(
topic1
[
0
])};
recv_array_expect_success
(
xpub_proxy
,
subscription1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic1
,
1
));
send_array_expect_success
(
xsub_proxy
,
subscription1
,
0
);
const
uint8_t
subscription2
[]
=
{
1
,
static_cast
<
uint8_t
>
(
topic2
[
0
])};
recv_array_expect_success
(
xpub_proxy
,
subscription2
,
ZMQ_DONTWAIT
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
xpub_proxy
,
ZMQ_SUBSCRIBE
,
topic2
,
1
));
send_array_expect_success
(
xsub_proxy
,
subscription2
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// let publisher send 2 msgs, each with its own topic
assert
(
zmq_send
(
pub
,
topic1
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
pub
,
payload
,
1
,
0
)
==
1
);
assert
(
zmq_send
(
pub
,
topic2
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
pub
,
payload
,
1
,
0
)
==
1
);
// let publisher send 2 msgs, each with its own topic
_buff
send_string_expect_success
(
pub
,
topic1
,
ZMQ_SNDMORE
);
send_string_expect_success
(
pub
,
payload
,
0
);
send_string_expect_success
(
pub
,
topic2
,
ZMQ_SNDMORE
);
send_string_expect_success
(
pub
,
payload
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// proxy reroutes data messages to subscribers
char
topic_buff
[
1
];
char
data_buff
[
1
];
assert
(
zmq_recv
(
xsub_proxy
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic1
);
assert
(
zmq_recv
(
xsub_proxy
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
assert
(
zmq_send
(
xpub_proxy
,
topic_buff
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
xpub_proxy
,
data_buff
,
1
,
0
)
==
1
);
assert
(
zmq_recv
(
xsub_proxy
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic2
);
assert
(
zmq_recv
(
xsub_proxy
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
assert
(
zmq_send
(
xpub_proxy
,
topic_buff
,
1
,
ZMQ_SNDMORE
)
==
1
);
assert
(
zmq_send
(
xpub_proxy
,
data_buff
,
1
,
0
)
==
1
);
recv_string_expect_success
(
xsub_proxy
,
topic1
,
ZMQ_DONTWAIT
);
recv_string_expect_success
(
xsub_proxy
,
payload
,
ZMQ_DONTWAIT
);
send_string_expect_success
(
xpub_proxy
,
topic1
,
ZMQ_SNDMORE
);
send_string_expect_success
(
xpub_proxy
,
payload
,
0
);
recv_string_expect_success
(
xsub_proxy
,
topic2
,
ZMQ_DONTWAIT
);
recv_string_expect_success
(
xsub_proxy
,
payload
,
ZMQ_DONTWAIT
);
send_string_expect_success
(
xpub_proxy
,
topic2
,
ZMQ_SNDMORE
);
send_string_expect_success
(
xpub_proxy
,
payload
,
0
);
// wait
msleep
(
SETTLE_TIME
);
// each subscriber should now get a message
assert
(
zmq_recv
(
sub2
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic2
);
assert
(
zmq_recv
(
sub2
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
recv_string_expect_success
(
sub2
,
topic2
,
ZMQ_DONTWAIT
);
recv_string_expect_success
(
sub2
,
payload
,
ZMQ_DONTWAIT
);
assert
(
zmq_recv
(
sub1
,
topic_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
topic_buff
[
0
]
==
*
topic1
);
assert
(
zmq_recv
(
sub1
,
data_buff
,
1
,
ZMQ_DONTWAIT
)
==
1
);
assert
(
data_buff
[
0
]
==
*
payload
);
recv_string_expect_success
(
sub1
,
topic1
,
ZMQ_DONTWAIT
);
recv_string_expect_success
(
sub1
,
payload
,
ZMQ_DONTWAIT
);
// Clean up
assert
(
zmq_close
(
sub1
)
==
0
);
assert
(
zmq_close
(
sub2
)
==
0
);
assert
(
zmq_close
(
pub
)
==
0
);
assert
(
zmq_close
(
xpub_proxy
)
==
0
);
assert
(
zmq_close
(
xsub_proxy
)
==
0
);
assert
(
zmq_ctx_term
(
ctx
)
==
0
);
return
0
;
test_context_socket_close
(
sub1
);
test_context_socket_close
(
sub2
);
test_context_socket_close
(
pub
);
test_context_socket_close
(
xpub_proxy
);
test_context_socket_close
(
xsub_proxy
);
}
int
test_unsubscribe_cleanup
(
void
)
void
test_unsubscribe_cleanup
()
{
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// Create a publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
assert
(
pub
);
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
int
manual
=
1
;
int
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
pub
,
"tcp://127.0.0.1:*"
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
pub
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
));
bind_loopback_ipv4
(
pub
,
my_endpoint
,
sizeof
my_endpoint
);
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
my_endpoint
);
assert
(
rc
==
0
);
void
*
sub
=
test_context_socket
(
ZMQ_XSUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
my_endpoint
));
// Subscribe for A
char
subscription
[
2
]
=
{
1
,
'A'
};
rc
=
zmq_send_const
(
sub
,
subscription
,
2
,
0
);
assert
(
rc
==
2
);
const
uint8_t
subscription1
[
2
]
=
{
1
,
'A'
};
send_array_expect_success
(
sub
,
subscription1
,
0
);
char
buffer
[
2
];
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XA"
,
2
);
assert
(
rc
==
0
);
recv_array_expect_success
(
pub
,
subscription1
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XA"
,
2
));
// send 2 messages
rc
=
zmq_send_const
(
pub
,
"XA"
,
2
,
0
);
assert
(
rc
==
2
);
rc
=
zmq_send_const
(
pub
,
"XB"
,
2
,
0
);
assert
(
rc
==
2
);
send_string_expect_success
(
pub
,
"XA"
,
0
);
send_string_expect_success
(
pub
,
"XB"
,
0
);
// receive the single message
rc
=
zmq_recv
(
sub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
'X'
);
assert
(
buffer
[
1
]
==
'A'
);
recv_string_expect_success
(
sub
,
"XA"
,
0
);
// should be nothing left in the queue
rc
=
zmq_recv
(
sub
,
buffer
,
2
,
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
);
char
buffer
[
2
];
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
sub
,
buffer
,
sizeof
buffer
,
ZMQ_DONTWAIT
));
// close the socket
rc
=
zmq_close
(
sub
);
assert
(
rc
==
0
);
test_context_socket_close
(
sub
);
// closing the socket will result in an unsubscribe event
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
const
uint8_t
unsubscription
[
2
]
=
{
0
,
'A'
};
recv_array_expect_success
(
pub
,
unsubscription
,
0
);
// this doesn't really do anything
// there is no last_pipe set it will just fail silently
rc
=
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XA"
,
2
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_UNSUBSCRIBE
,
"XA"
,
2
));
// reconnect
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
rc
=
zmq_connect
(
sub
,
my_endpoint
);
assert
(
rc
==
0
);
sub
=
test_context_socket
(
ZMQ_XSUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
my_endpoint
));
// send a subscription for B
subscription
[
0
]
=
1
;
subscription
[
1
]
=
'B'
;
rc
=
zmq_send
(
sub
,
subscription
,
2
,
0
);
assert
(
rc
==
2
);
const
uint8_t
subscription2
[
2
]
=
{
1
,
'B'
};
send_array_expect_success
(
sub
,
subscription2
,
0
);
// receive the subscription, overwrite it to XB
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'B'
);
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XB"
,
2
);
assert
(
rc
==
0
);
recv_array_expect_success
(
pub
,
subscription2
,
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"XB"
,
2
));
// send 2 messages
rc
=
zmq_send_const
(
pub
,
"XA"
,
2
,
0
);
assert
(
rc
==
2
);
rc
=
zmq_send_const
(
pub
,
"XB"
,
2
,
0
);
assert
(
rc
==
2
);
send_string_expect_success
(
pub
,
"XA"
,
0
);
send_string_expect_success
(
pub
,
"XB"
,
0
);
// receive the single message
rc
=
zmq_recv
(
sub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
'X'
);
assert
(
buffer
[
1
]
==
'B'
);
// this assertion will fail
recv_string_expect_success
(
sub
,
"XB"
,
0
);
// should be nothing left in the queue
rc
=
zmq_recv
(
sub
,
buffer
,
2
,
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
sub
,
buffer
,
sizeof
buffer
,
ZMQ_DONTWAIT
)
);
// Clean up.
rc
=
zmq_close
(
pub
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
sub
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
test_context_socket_close
(
pub
);
test_context_socket_close
(
sub
);
}
int
main
(
void
)
int
main
()
{
setup_test_environment
();
test_basic
();
test_unsubscribe_manual
();
test_xpub_proxy_unsubscribe_on_disconnect
();
test_missing_subscriptions
();
test_unsubscribe_cleanup
();
return
0
;
UNITY_BEGIN
();
RUN_TEST
(
test_basic
);
RUN_TEST
(
test_unsubscribe_manual
);
RUN_TEST
(
test_xpub_proxy_unsubscribe_on_disconnect
);
RUN_TEST
(
test_missing_subscriptions
);
RUN_TEST
(
test_unsubscribe_cleanup
);
return
UNITY_END
();
}
tests/testutil_unity.hpp
View file @
713f075f
...
...
@@ -118,6 +118,32 @@ void recv_string_expect_success (void *socket_, const char *str_, int flags_)
TEST_ASSERT_EQUAL_STRING_LEN
(
str_
,
buffer
,
len
);
}
template
<
size_t
SIZE
>
void
send_array_expect_success
(
void
*
socket_
,
const
uint8_t
(
&
array_
)[
SIZE
],
int
flags_
)
{
const
int
rc
=
zmq_send
(
socket_
,
array_
,
SIZE
,
flags_
);
TEST_ASSERT_EQUAL_INT
(
static_cast
<
int
>
(
SIZE
),
rc
);
}
template
<
size_t
SIZE
>
void
recv_array_expect_success
(
void
*
socket_
,
const
uint8_t
(
&
array_
)[
SIZE
],
int
flags_
)
{
char
buffer
[
255
];
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE
(
sizeof
(
buffer
),
SIZE
,
"recv_string_expect_success cannot be "
"used for strings longer than 255 "
"characters"
);
const
int
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
socket_
,
buffer
,
sizeof
(
buffer
),
flags_
));
TEST_ASSERT_EQUAL_INT
(
static_cast
<
int
>
(
SIZE
),
rc
);
TEST_ASSERT_EQUAL_UINT8_ARRAY
(
array_
,
buffer
,
SIZE
);
}
// do not call from tests directly, use setup_test_context, get_test_context and teardown_test_context only
void
*
internal_manage_test_context
(
bool
init_
,
bool
clear_
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment