Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
1a55100d
Commit
1a55100d
authored
6 years ago
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: test_xpub_verbose is not using testutil_unity
Solution: use utilities from testutil_unity, reduce duplication
parent
23340907
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
131 additions
and
276 deletions
+131
-276
test_xpub_verbose.cpp
tests/test_xpub_verbose.cpp
+131
-276
No files found.
tests/test_xpub_verbose.cpp
View file @
1a55100d
...
@@ -28,448 +28,303 @@
...
@@ -28,448 +28,303 @@
*/
*/
#include "testutil.hpp"
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void
setUp
()
void
setUp
()
{
{
setup_test_context
();
}
}
void
tearDown
()
void
tearDown
()
{
{
teardown_test_context
();
}
}
const
uint8_t
unsubscribe_a_msg
[]
=
{
0
,
'A'
};
const
uint8_t
subscribe_a_msg
[]
=
{
1
,
'A'
};
const
uint8_t
subscribe_b_msg
[]
=
{
1
,
'B'
};
const
char
test_endpoint
[]
=
"inproc://soname"
;
const
char
topic_a
[]
=
"A"
;
const
char
topic_b
[]
=
"B"
;
void
test_xpub_verbose_one_sub
()
void
test_xpub_verbose_one_sub
()
{
{
int
rc
;
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
char
buffer
[
2
];
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
test_endpoint
));
void
*
ctx
=
zmq_ctx_new
();
TEST_ASSERT_NOT_NULL
(
ctx
);
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
void
*
sub
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_NOT_NULL
(
pub
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
test_endpoint
));
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
TEST_ASSERT_NOT_NULL
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Subscribe for A
// Subscribe for A
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Subscribe socket for B instead
// Subscribe socket for B instead
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"B"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_b
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_b_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'B'
);
// Subscribe again for A again
// Subscribe again for A again
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// This time it is duplicated, so it will be filtered out
// This time it is duplicated, so it will be filtered out
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
int
verbose
=
1
;
int
verbose
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSE
,
&
verbose
,
sizeof
(
int
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSE
,
&
verbose
,
sizeof
(
int
))
);
// Subscribe socket for A again
// Subscribe socket for A again
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// This time with VERBOSE the duplicated sub will be received
// This time with VERBOSE the duplicated sub will be received
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Sending A message and B Message
// Sending A message and B Message
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
send_string_expect_success
(
pub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
send_string_expect_success
(
pub
,
topic_b
,
0
);
rc
=
zmq_send_const
(
pub
,
"B"
,
1
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
0
);
recv_string_expect_success
(
sub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
recv_string_expect_success
(
sub
,
topic_b
,
0
);
assert
(
buffer
[
0
]
==
'A'
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
assert
(
buffer
[
0
]
==
'B'
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
test_context_socket_close
(
pub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
test_context_socket_close
(
sub
);
rc
=
zmq_close
(
sub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_ctx_term
(
ctx
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
}
}
void
create_xpub_with_2_subs
(
void
*
ctx_
,
void
create_xpub_with_2_subs
(
void
**
pub_
,
void
**
sub0_
,
void
**
sub1_
)
void
**
pub_
,
void
**
sub0_
,
void
**
sub1_
)
{
{
*
pub_
=
zmq_socket
(
ctx_
,
ZMQ_XPUB
);
*
pub_
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_NOT_NULL
(
*
pub_
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
*
pub_
,
test_endpoint
));
int
rc
=
zmq_bind
(
*
pub_
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
*
sub0_
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
*
sub0_
,
test_endpoint
));
*
sub0_
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
TEST_ASSERT_NOT_NULL
(
*
sub0_
);
*
sub1_
=
test_context_socket
(
ZMQ_SUB
);
rc
=
zmq_connect
(
*
sub0_
,
"inproc://soname"
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
*
sub1_
,
test_endpoint
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
*
sub1_
=
zmq_socket
(
ctx_
,
ZMQ_SUB
);
TEST_ASSERT_NOT_NULL
(
*
sub1_
);
rc
=
zmq_connect
(
*
sub1_
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
}
}
void
create_duplicate_subscription
(
void
*
pub_
,
void
*
sub0_
,
void
*
sub1_
)
void
create_duplicate_subscription
(
void
*
pub_
,
void
*
sub0_
,
void
*
sub1_
)
{
{
// Subscribe for A
// Subscribe for A
int
rc
=
zmq_setsockopt
(
sub0_
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub0_
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
)
);
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
char
buffer
[
2
];
recv_array_expect_success
(
pub_
,
subscribe_a_msg
,
0
);
rc
=
zmq_recv
(
pub_
,
buffer
,
2
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Subscribe again for A on the other socket
// Subscribe again for A on the other socket
rc
=
zmq_setsockopt
(
sub1_
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1_
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
)
);
// This time it is duplicated, so it will be filtered out by XPUB
// This time it is duplicated, so it will be filtered out by XPUB
rc
=
zmq_recv
(
pub_
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub_
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
}
}
void
test_xpub_verbose_two_subs
()
void
test_xpub_verbose_two_subs
()
{
{
int
rc
;
char
buffer
[
2
];
void
*
ctx
=
zmq_ctx_new
();
TEST_ASSERT_NOT_NULL
(
ctx
);
void
*
pub
,
*
sub0
,
*
sub1
;
void
*
pub
,
*
sub0
,
*
sub1
;
create_xpub_with_2_subs
(
ctx
,
&
pub
,
&
sub0
,
&
sub1
);
create_xpub_with_2_subs
(
&
pub
,
&
sub0
,
&
sub1
);
create_duplicate_subscription
(
pub
,
sub0
,
sub1
);
create_duplicate_subscription
(
pub
,
sub0
,
sub1
);
// Subscribe socket for B instead
// Subscribe socket for B instead
rc
=
zmq_setsockopt
(
sub0
,
ZMQ_SUBSCRIBE
,
"B"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub0
,
ZMQ_SUBSCRIBE
,
topic_b
,
1
)
);
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_b_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'B'
);
int
verbose
=
1
;
int
verbose
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSE
,
&
verbose
,
sizeof
(
int
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSE
,
&
verbose
,
sizeof
(
int
))
);
// Subscribe socket for A again
// Subscribe socket for A again
rc
=
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
)
);
// This time with VERBOSE the duplicated sub will be received
// This time with VERBOSE the duplicated sub will be received
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Sending A message and B Message
// Sending A message and B Message
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
send_string_expect_success
(
pub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
rc
=
zmq_send_const
(
pub
,
"B"
,
1
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
rc
=
zmq_recv
(
sub0
,
buffer
,
1
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
assert
(
buffer
[
0
]
==
'A'
);
rc
=
zmq_recv
(
sub1
,
buffer
,
1
,
0
);
send_string_expect_success
(
pub
,
topic_b
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
assert
(
buffer
[
0
]
==
'A'
);
r
c
=
zmq_recv
(
sub0
,
buffer
,
1
,
0
);
r
ecv_string_expect_success
(
sub0
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
recv_string_expect_success
(
sub1
,
topic_a
,
0
);
assert
(
buffer
[
0
]
==
'B'
);
recv_string_expect_success
(
sub0
,
topic_b
,
0
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
test_context_socket_close
(
pub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
test_context_socket_close
(
sub0
);
rc
=
zmq_close
(
sub0
);
test_context_socket_close
(
sub1
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_close
(
sub1
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_ctx_term
(
ctx
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
}
}
void
test_xpub_verboser_one_sub
()
void
test_xpub_verboser_one_sub
()
{
{
int
rc
;
char
buffer
[
3
];
void
*
ctx
=
zmq_ctx_new
();
TEST_ASSERT_NOT_NULL
(
ctx
);
// Create a publisher
// Create a publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_XPUB
);
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_NOT_NULL
(
pub
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
test_endpoint
));
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Create a subscriber
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
void
*
sub
=
test_context_socket
(
ZMQ_SUB
);
TEST_ASSERT_NOT_NULL
(
sub
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
test_endpoint
));
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Unsubscribe for A, does not exist yet
// Unsubscribe for A, does not exist yet
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Does not exist, so it will be filtered out by XSUB
// Does not exist, so it will be filtered out by XSUB
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Subscribe for A
// Subscribe for A
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Subscribe again for A again, XSUB will increase refcount
// Subscribe again for A again, XSUB will increase refcount
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// This time it is duplicated, so it will be filtered out by XPUB
// This time it is duplicated, so it will be filtered out by XPUB
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Unsubscribe for A, this time it exists in XPUB
// Unsubscribe for A, this time it exists in XPUB
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// XSUB refcounts and will not actually send unsub to PUB until the number
// XSUB refcounts and will not actually send unsub to PUB until the number
// of unsubs match the earlier subs
// of unsubs match the earlier subs
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Receive unsubscriptions from subscriber
// Receive unsubscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
unsubscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
// XSUB only sends the last and final unsub, so XPUB will only receive 1
// XSUB only sends the last and final unsub, so XPUB will only receive 1
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Unsubscribe for A, does not exist anymore
// Unsubscribe for A, does not exist anymore
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Does not exist, so it will be filtered out by XSUB
// Does not exist, so it will be filtered out by XSUB
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
int
verbose
=
1
;
int
verbose
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSER
,
&
verbose
,
sizeof
(
int
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSER
,
&
verbose
,
sizeof
(
int
))
);
// Subscribe socket for A again
// Subscribe socket for A again
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
));
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
// Receive subscriptions from subscriber, did not exist anymore
// Receive subscriptions from subscriber, did not exist anymore
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Sending A message to make sure everything still works
// Sending A message to make sure everything still works
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
send_string_expect_success
(
pub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
0
);
recv_string_expect_success
(
sub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
assert
(
buffer
[
0
]
==
'A'
);
// Unsubscribe for A, this time it exists
// Unsubscribe for A, this time it exists
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Receive unsubscriptions from subscriber
// Receive unsubscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
unsubscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
// Unsubscribe for A again, it does not exist anymore so XSUB will filter
// Unsubscribe for A again, it does not exist anymore so XSUB will filter
rc
=
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// XSUB only sends unsub if it matched it in its trie, IOW: it will only
// XSUB only sends unsub if it matched it in its trie, IOW: it will only
// send it if it existed in the first place even with XPUB_VERBBOSER
// send it if it existed in the first place even with XPUB_VERBBOSER
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
test_context_socket_close
(
pub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
test_context_socket_close
(
sub
);
rc
=
zmq_close
(
sub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_ctx_term
(
ctx
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
}
}
void
test_xpub_verboser_two_subs
()
void
test_xpub_verboser_two_subs
()
{
{
int
rc
;
char
buffer
[
3
];
void
*
ctx
=
zmq_ctx_new
();
TEST_ASSERT_NOT_NULL
(
ctx
);
void
*
pub
,
*
sub0
,
*
sub1
;
void
*
pub
,
*
sub0
,
*
sub1
;
create_xpub_with_2_subs
(
ctx
,
&
pub
,
&
sub0
,
&
sub1
);
create_xpub_with_2_subs
(
&
pub
,
&
sub0
,
&
sub1
);
create_duplicate_subscription
(
pub
,
sub0
,
sub1
);
create_duplicate_subscription
(
pub
,
sub0
,
sub1
);
// Unsubscribe for A, this time it exists in XPUB
// Unsubscribe for A, this time it exists in XPUB
rc
=
zmq_setsockopt
(
sub0
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub0
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// sub1 is still subscribed, so no notification
// sub1 is still subscribed, so no notification
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Unsubscribe the second socket to trigger the notification
// Unsubscribe the second socket to trigger the notification
rc
=
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Receive unsubscriptions since all sockets are gone
// Receive unsubscriptions since all sockets are gone
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
unsubscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
// Make really sure there is only one notification
// Make really sure there is only one notification
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
int
verbose
=
1
;
int
verbose
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSER
,
&
verbose
,
sizeof
(
int
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
pub
,
ZMQ_XPUB_VERBOSER
,
&
verbose
,
sizeof
(
int
))
);
// Subscribe socket for A again
// Subscribe socket for A again
rc
=
zmq_setsockopt
(
sub0
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub0
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
)
);
// Subscribe socket for A again
// Subscribe socket for A again
rc
=
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1
,
ZMQ_SUBSCRIBE
,
topic_a
,
1
)
);
// Receive subscriptions from subscriber, did not exist anymore
// Receive subscriptions from subscriber, did not exist anymore
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// VERBOSER is set, so subs from both sockets are received
// VERBOSER is set, so subs from both sockets are received
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
subscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
// Sending A message to make sure everything still works
// Sending A message to make sure everything still works
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
send_string_expect_success
(
pub
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
rc
=
zmq_recv
(
sub0
,
buffer
,
1
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
assert
(
buffer
[
0
]
==
'A'
);
rc
=
zmq_recv
(
sub1
,
buffer
,
1
,
0
);
recv_string_expect_success
(
sub0
,
topic_a
,
0
);
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
recv_string_expect_success
(
sub1
,
topic_a
,
0
);
assert
(
buffer
[
0
]
==
'A'
);
// Unsubscribe for A
// Unsubscribe for A
rc
=
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Receive unsubscriptions from first subscriber due to VERBOSER
// Receive unsubscriptions from first subscriber due to VERBOSER
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
unsubscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
// Unsubscribe for A again from the other socket
// Unsubscribe for A again from the other socket
rc
=
zmq_setsockopt
(
sub0
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub0
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Receive unsubscriptions from first subscriber due to VERBOSER
// Receive unsubscriptions from first subscriber due to VERBOSER
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
recv_array_expect_success
(
pub
,
unsubscribe_a_msg
,
0
);
TEST_ASSERT_EQUAL_INT
(
2
,
rc
);
assert
(
buffer
[
0
]
==
0
);
assert
(
buffer
[
1
]
==
'A'
);
// Unsubscribe again to make sure it gets filtered now
// Unsubscribe again to make sure it gets filtered now
rc
=
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
"A"
,
1
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
zmq_setsockopt
(
sub1
,
ZMQ_UNSUBSCRIBE
,
topic_a
,
1
)
);
// Unmatched, so XSUB filters even with VERBOSER
// Unmatched, so XSUB filters even with VERBOSER
rc
=
zmq_recv
(
pub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_recv
(
pub
,
NULL
,
0
,
ZMQ_DONTWAIT
));
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
errno
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
test_context_socket_close
(
pub
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
test_context_socket_close
(
sub0
);
rc
=
zmq_close
(
sub0
);
test_context_socket_close
(
sub1
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_close
(
sub1
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
rc
=
zmq_ctx_term
(
ctx
);
TEST_ASSERT_EQUAL_INT
(
0
,
rc
);
}
}
int
main
(
void
)
int
main
()
{
{
setup_test_environment
();
setup_test_environment
();
...
@@ -479,5 +334,5 @@ int main (void)
...
@@ -479,5 +334,5 @@ int main (void)
RUN_TEST
(
test_xpub_verboser_one_sub
);
RUN_TEST
(
test_xpub_verboser_one_sub
);
RUN_TEST
(
test_xpub_verboser_two_subs
);
RUN_TEST
(
test_xpub_verboser_two_subs
);
return
0
;
return
UNITY_END
()
;
}
}
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment