Unverified Commit 9cb266ed authored by Doron Somech's avatar Doron Somech Committed by GitHub

Merge pull request #3512 from bluca/fixes

Problems: typos in docs, unnecessary ifdefs, duplicated APIs
parents 2f98f703 19ff4d0b
......@@ -966,6 +966,7 @@ test_apps += tests/test_poller \
tests/test_scatter_gather \
tests/test_dgram \
tests/test_app_meta \
tests/test_xpub_manual_last_value \
tests/test_router_notify
tests_test_poller_SOURCES = tests/test_poller.cpp
......@@ -996,6 +997,10 @@ tests_test_dgram_SOURCES = tests/test_dgram.cpp
tests_test_dgram_LDADD = src/libzmq.la ${TESTUTIL_LIBS}
tests_test_dgram_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_xpub_manual_last_value_SOURCES = tests/test_xpub_manual_last_value.cpp
tests_test_xpub_manual_last_value_LDADD = src/libzmq.la ${TESTUTIL_LIBS}
tests_test_xpub_manual_last_value_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_app_meta_SOURCES = tests/test_app_meta.cpp
tests_test_app_meta_LDADD = src/libzmq.la ${TESTUTIL_LIBS}
tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
......
......@@ -1074,12 +1074,12 @@ Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_MANUAL_LAST_VALUE: change the subscription handling to manual
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This option is similar to ZMQ_XPUB_MANUAL.
What is the difference, ZMQ_XPUB_MANUAL_LAST_VALUE sets the 'XPUB' socket
behaviour to send the first message to the last subscriber after the 'XPUB' socket
recieve a subscription and call setsockopt with ZMQ_SUBSCRIBE on 'XPUB' socket.
This prevent duplicated message when use last value caching(LVC).
The difference is that ZMQ_XPUB_MANUAL_LAST_VALUE changes the 'XPUB' socket
behaviour to send the first message to the last subscriber after the socket
receives a subscription and call setsockopt with ZMQ_SUBSCRIBE on 'XPUB' socket.
This prevents duplicated messages when using last value caching(LVC).
NOTE: in DRAFT state, not yet available in stable releases.
......
......@@ -10,9 +10,7 @@ zmq_socket_monitor_versioned - monitor socket events
SYNOPSIS
--------
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
*int zmq_socket_monitor_versioned_typed (
void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');*
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');*
*int zmq_socket_monitor_pipes_stats (void '*socket');*
......@@ -58,11 +56,8 @@ connection uses a bound or connected local endpoint.
Note that the format of the second and further frames, and also the number of
frames, may be different for events added in the future.
The _zmq_socket_monitor_versioned_typed()_ is a generalisation of
_zmq_socket_monitor_versioned_ that supports more monitoring socket types.
The 'type' argument is used to specify the type of the monitoring socket.
Supported types are 'ZMQ_PAIR' (which is the equivalent of
_zmq_socket_monitor_versioned_), 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers
Supported types are 'ZMQ_PAIR', 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers
of the events will have to be compatible with the socket type, for instance a
monitoring socket of type 'ZMQ_PUB' will require consumers of type 'ZMQ_SUB'.
In the case that the monitoring socket type is of 'ZMQ_PUB', the multipart
......@@ -224,19 +219,6 @@ The 0MQ 'context' associated with the specified 'socket' was terminated.
The transport protocol of the monitor 'endpoint' is not supported. Monitor
sockets are required to use the inproc:// transport.
*EINVAL*::
The monitor 'endpoint' supplied does not exist.
ERRORS - _zmq_socket_monitor_typed()_
-------------------------------
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EPROTONOSUPPORT*::
The transport protocol of the monitor 'endpoint' is not supported. Monitor
sockets are required to use the inproc:// transport.
*EINVAL*::
The monitor 'endpoint' supplied does not exist or the specified socket 'type'
is not supported.
......
......@@ -732,11 +732,7 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
ZMQ_EXPORT int zmq_socket_monitor_versioned_typed (
ZMQ_EXPORT int zmq_socket_monitor_versioned (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
......
......@@ -44,9 +44,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
_more (false),
_lossy (true),
_manual (false),
#ifdef ZMQ_BUILD_DRAFT_API
_send_last_pipe (false),
#endif
_pending_pipes (),
_welcome_msg ()
{
......@@ -193,10 +191,8 @@ int zmq::xpub_t::xsetsockopt (int option_,
size_t optvallen_)
{
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
#ifdef ZMQ_BUILD_DRAFT_API
|| option_ == ZMQ_XPUB_MANUAL_LAST_VALUE
#endif
|| option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) {
|| option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
|| option_ == ZMQ_XPUB_MANUAL) {
if (optvallen_ != sizeof (int)
|| *static_cast<const int *> (optval_) < 0) {
errno = EINVAL;
......@@ -208,11 +204,9 @@ int zmq::xpub_t::xsetsockopt (int option_,
} else if (option_ == ZMQ_XPUB_VERBOSER) {
_verbose_subs = (*static_cast<const int *> (optval_) != 0);
_verbose_unsubs = _verbose_subs;
#ifdef ZMQ_BUILD_DRAFT_API
} else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
_manual = (*static_cast<const int *> (optval_) != 0);
_send_last_pipe = _manual;
#endif
} else if (option_ == ZMQ_XPUB_NODROP)
_lossy = (*static_cast<const int *> (optval_) == 0);
else if (option_ == ZMQ_XPUB_MANUAL)
......@@ -276,13 +270,11 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
self_->_dist.match (pipe_);
}
#ifdef ZMQ_BUILD_DRAFT_API
void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
{
if (self_->_last_pipe == pipe_)
self_->_dist.match (pipe_);
}
#endif
int zmq::xpub_t::xsend (msg_t *msg_)
{
......@@ -290,8 +282,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
// For the first part of multi-part message, find the matching pipes.
if (!_more) {
#ifdef ZMQ_BUILD_DRAFT_API
if (_manual && _last_pipe && _send_last_pipe) {
if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_last_pipe_as_matching,
this);
......@@ -299,10 +290,6 @@ int zmq::xpub_t::xsend (msg_t *msg_)
} else
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_as_matching, this);
#else
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_as_matching, this);
#endif
// If inverted matching is used, reverse the selection now
if (options.invert_matching) {
_dist.reverse_match ();
......
......@@ -99,13 +99,11 @@ class xpub_t : public socket_base_t
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
bool _manual;
#ifdef ZMQ_BUILD_DRAFT_API
// Send message to the last pipe, only used if xpub is on manual and after calling set option with ZMQ_SUBSCRIBE
bool _send_last_pipe;
// Function to be applied to match the last pipe.
static void mark_last_pipe_as_matching (zmq::pipe_t *pipe_, xpub_t *arg_);
#endif
// Last pipe that sent subscription message, only used if xpub is on manual
pipe_t *_last_pipe;
......
......@@ -267,30 +267,18 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return s->getsockopt (option_, optval_, optvallen_);
}
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_)
int zmq_socket_monitor_versioned (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->monitor (addr_, events_, event_version_, ZMQ_PAIR);
return s->monitor (addr_, events_, event_version_, type_);
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
return zmq_socket_monitor_versioned (s_, addr_, events_, 1);
}
int zmq_socket_monitor_versioned_typed (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->monitor (addr_, events_, event_version_, type_);
return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR);
}
int zmq_join (void *s_, const char *group_)
......
......@@ -130,11 +130,7 @@ int zmq_socket_get_peer_state (void *socket_,
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
int zmq_socket_monitor_versioned_typed (
int zmq_socket_monitor_versioned (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
int zmq_socket_monitor_pipes_stats (void *s_);
......
......@@ -126,21 +126,21 @@ void test_monitor_basic ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
void test_monitor_versioned_typed_invalid_socket_type ()
void test_monitor_versioned_invalid_socket_type ()
{
void *client = test_context_socket (ZMQ_DEALER);
// Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_socket_monitor_versioned_typed (
EINVAL, zmq_socket_monitor_versioned (
client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT));
test_context_socket_close_zero_linger (client);
}
void test_monitor_versioned_typed_basic (bind_function_t bind_function_,
const char *expected_prefix_,
int type_)
void test_monitor_versioned_basic (bind_function_t bind_function_,
const char *expected_prefix_,
int type_)
{
char server_endpoint[MAX_SOCKET_STRING];
char client_mon_endpoint[MAX_SOCKET_STRING];
......@@ -158,9 +158,9 @@ void test_monitor_versioned_typed_basic (bind_function_t bind_function_,
void *server = test_context_socket (ZMQ_DEALER);
// Monitor all events on client and server sockets
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed (
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
client, client_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed (
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
server, server_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
// Choose the appropriate consumer socket type.
......@@ -267,35 +267,33 @@ void test_monitor_versioned_typed_basic (bind_function_t bind_function_,
void test_monitor_versioned_basic_tcp_ipv4 ()
{
static const char prefix[] = "tcp://127.0.0.1:";
// Calling 'monitor_versioned_typed' with ZMQ_PAIR is the equivalent of
// calling 'monitor_versioned'.
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH);
test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR);
test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUB);
test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH);
}
void test_monitor_versioned_basic_tcp_ipv6 ()
{
static const char prefix[] = "tcp://[::1]:";
test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH);
test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR);
test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUB);
test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH);
}
void test_monitor_versioned_basic_ipc ()
{
static const char prefix[] = "ipc://";
test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUSH);
test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PAIR);
test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUB);
test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUSH);
}
void test_monitor_versioned_basic_tipc ()
{
static const char prefix[] = "tipc://";
test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUSH);
test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PAIR);
test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUB);
test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUSH);
}
#ifdef ZMQ_EVENT_PIPES_STATS
......@@ -310,7 +308,7 @@ void test_monitor_versioned_stats (bind_function_t bind_function_,
void *push = test_context_socket (ZMQ_PUSH);
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2));
push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2, ZMQ_PAIR));
// Should fail if there are no pipes to monitor
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push));
......@@ -437,7 +435,7 @@ int main ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
RUN_TEST (test_monitor_versioned_typed_invalid_socket_type);
RUN_TEST (test_monitor_versioned_invalid_socket_type);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
RUN_TEST (test_monitor_versioned_basic_ipc);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment