Commit 606a8f79 authored by jean-airoldie's avatar jean-airoldie

Problem: Socket monitoring only allows ZMQ_PAIR

Solution: Allow ZMQ_PUB and ZMQ_PUSH sockets types for the monitoring.
This way someone could create a ZMQ_PULL socket connected to multiple
monitoring sockets at the same time.
parent e285fe6e
...@@ -11,6 +11,8 @@ zmq_socket_monitor_versioned - monitor socket events ...@@ -11,6 +11,8 @@ zmq_socket_monitor_versioned - monitor socket events
SYNOPSIS SYNOPSIS
-------- --------
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* *int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
*int zmq_socket_monitor_versioned_typed (
void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');*
*int zmq_socket_monitor_pipes_stats (void '*socket');* *int zmq_socket_monitor_pipes_stats (void '*socket');*
...@@ -56,6 +58,17 @@ connection uses a bound or connected local endpoint. ...@@ -56,6 +58,17 @@ connection uses a bound or connected local endpoint.
Note that the format of the second and further frames, and also the number of Note that the format of the second and further frames, and also the number of
frames, may be different for events added in the future. frames, may be different for events added in the future.
The _zmq_socket_monitor_versioned_typed()_ is a generalisation of
_zmq_socket_monitor_versioned_ that supports more monitoring socket types.
The 'type' argument is used to specify the type of the monitoring socket.
Supported types are 'ZMQ_PAIR' (which is the equivalent of
_zmq_socket_monitor_versioned_), 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers
of the events will have to be compatible with the socket type, for instance a
monitoring socket of type 'ZMQ_PUB' will require consumers of type 'ZMQ_SUB'.
In the case that the monitoring socket type is of 'ZMQ_PUB', the multipart
message topic is the event number, thus consumers should subscribe to the
events they want to receive.
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state. NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
...@@ -215,6 +228,20 @@ sockets are required to use the inproc:// transport. ...@@ -215,6 +228,20 @@ sockets are required to use the inproc:// transport.
The monitor 'endpoint' supplied does not exist. The monitor 'endpoint' supplied does not exist.
ERRORS - _zmq_socket_monitor_typed()_
-------------------------------
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EPROTONOSUPPORT*::
The transport protocol of the monitor 'endpoint' is not supported. Monitor
sockets are required to use the inproc:// transport.
*EINVAL*::
The monitor 'endpoint' supplied does not exist or the specified socket 'type'
is not supported.
ERRORS - _zmq_socket_monitor_pipes_stats()_ ERRORS - _zmq_socket_monitor_pipes_stats()_
------------------------------------------- -------------------------------------------
*ENOTSOCK*:: *ENOTSOCK*::
......
...@@ -735,6 +735,8 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, ...@@ -735,6 +735,8 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_, const char *addr_,
uint64_t events_, uint64_t events_,
int event_version_); int event_version_);
ZMQ_EXPORT int zmq_socket_monitor_versioned_typed (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
#endif // ZMQ_BUILD_DRAFT_API #endif // ZMQ_BUILD_DRAFT_API
......
...@@ -1639,7 +1639,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) ...@@ -1639,7 +1639,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
int zmq::socket_base_t::monitor (const char *endpoint_, int zmq::socket_base_t::monitor (const char *endpoint_,
uint64_t events_, uint64_t events_,
int event_version_) int event_version_,
int type_)
{ {
scoped_lock_t lock (_monitor_sync); scoped_lock_t lock (_monitor_sync);
...@@ -1670,14 +1671,31 @@ int zmq::socket_base_t::monitor (const char *endpoint_, ...@@ -1670,14 +1671,31 @@ int zmq::socket_base_t::monitor (const char *endpoint_,
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT;
return -1; return -1;
} }
// already monitoring. Stop previous monitor before starting new one. // already monitoring. Stop previous monitor before starting new one.
if (_monitor_socket != NULL) { if (_monitor_socket != NULL) {
stop_monitor (true); stop_monitor (true);
} }
// Check if the specified socket type is supported. It must be a
// one-way socket types that support the SNDMORE flag.
switch (type_) {
case ZMQ_PAIR:
break;
case ZMQ_PUB:
break;
case ZMQ_PUSH:
break;
default:
errno = EINVAL;
return -1;
}
// Register events to monitor // Register events to monitor
_monitor_events = events_; _monitor_events = events_;
options.monitor_event_version = event_version_; options.monitor_event_version = event_version_;
_monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR); // Create a monitor socket of the specified type.
_monitor_socket = zmq_socket (get_ctx (), type_);
if (_monitor_socket == NULL) if (_monitor_socket == NULL)
return -1; return -1;
......
...@@ -119,7 +119,10 @@ class socket_base_t : public own_t, ...@@ -119,7 +119,10 @@ class socket_base_t : public own_t,
void lock (); void lock ();
void unlock (); void unlock ();
int monitor (const char *endpoint_, uint64_t events_, int event_version_); int monitor (const char *endpoint_,
uint64_t events_,
int event_version_,
int type_);
void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_, void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_); zmq::fd_t fd_);
......
...@@ -275,7 +275,7 @@ int zmq_socket_monitor_versioned (void *s_, ...@@ -275,7 +275,7 @@ int zmq_socket_monitor_versioned (void *s_,
zmq::socket_base_t *s = as_socket_base_t (s_); zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s) if (!s)
return -1; return -1;
return s->monitor (addr_, events_, event_version_); return s->monitor (addr_, events_, event_version_, ZMQ_PAIR);
} }
int zmq_socket_monitor (void *s_, const char *addr_, int events_) int zmq_socket_monitor (void *s_, const char *addr_, int events_)
...@@ -283,6 +283,16 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_) ...@@ -283,6 +283,16 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_)
return zmq_socket_monitor_versioned (s_, addr_, events_, 1); return zmq_socket_monitor_versioned (s_, addr_, events_, 1);
} }
int zmq_socket_monitor_versioned_typed (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->monitor (addr_, events_, event_version_, type_);
}
int zmq_join (void *s_, const char *group_) int zmq_join (void *s_, const char *group_)
{ {
zmq::socket_base_t *s = as_socket_base_t (s_); zmq::socket_base_t *s = as_socket_base_t (s_);
......
...@@ -133,6 +133,8 @@ int zmq_socket_monitor_versioned (void *s_, ...@@ -133,6 +133,8 @@ int zmq_socket_monitor_versioned (void *s_,
const char *addr_, const char *addr_,
uint64_t events_, uint64_t events_,
int event_version_); int event_version_);
int zmq_socket_monitor_versioned_typed (
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
int zmq_socket_monitor_pipes_stats (void *s_); int zmq_socket_monitor_pipes_stats (void *s_);
#endif // ZMQ_BUILD_DRAFT_API #endif // ZMQ_BUILD_DRAFT_API
......
...@@ -126,8 +126,21 @@ void test_monitor_basic () ...@@ -126,8 +126,21 @@ void test_monitor_basic ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \ || (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
void test_monitor_versioned_basic (bind_function_t bind_function_, void test_monitor_versioned_typed_invalid_socket_type ()
const char *expected_prefix_) {
void *client = test_context_socket (ZMQ_DEALER);
// Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
TEST_ASSERT_FAILURE_ERRNO (
EINVAL, zmq_socket_monitor_versioned_typed (
client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT));
test_context_socket_close_zero_linger (client);
}
void test_monitor_versioned_typed_basic (bind_function_t bind_function_,
const char *expected_prefix_,
int type_)
{ {
char server_endpoint[MAX_SOCKET_STRING]; char server_endpoint[MAX_SOCKET_STRING];
...@@ -136,14 +149,36 @@ void test_monitor_versioned_basic (bind_function_t bind_function_, ...@@ -136,14 +149,36 @@ void test_monitor_versioned_basic (bind_function_t bind_function_,
void *server = test_context_socket (ZMQ_DEALER); void *server = test_context_socket (ZMQ_DEALER);
// Monitor all events on client and server sockets // Monitor all events on client and server sockets
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed (
client, "inproc://monitor-client", ZMQ_EVENT_ALL_V2, 2)); client, "inproc://monitor-client", ZMQ_EVENT_ALL_V2, 2, type_));
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed (
server, "inproc://monitor-server", ZMQ_EVENT_ALL_V2, 2)); server, "inproc://monitor-server", ZMQ_EVENT_ALL_V2, 2, type_));
// Choose the appropriate consumer socket type.
int mon_type;
switch (type_) {
case ZMQ_PAIR:
mon_type = ZMQ_PAIR;
break;
case ZMQ_PUSH:
mon_type = ZMQ_PULL;
break;
case ZMQ_PUB:
mon_type = ZMQ_SUB;
break;
}
// Create two sockets for collecting monitor events // Create two sockets for collecting monitor events
void *client_mon = test_context_socket (ZMQ_PAIR); void *client_mon = test_context_socket (mon_type);
void *server_mon = test_context_socket (ZMQ_PAIR); void *server_mon = test_context_socket (mon_type);
// Additionally subscribe to all events if a PUB socket is used.
if (type_ == ZMQ_PUB) {
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (client_mon, ZMQ_SUBSCRIBE, "", 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server_mon, ZMQ_SUBSCRIBE, "", 0));
}
// Connect these to the inproc endpoints so they'll get events // Connect these to the inproc endpoints so they'll get events
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
...@@ -220,30 +255,44 @@ void test_monitor_versioned_basic (bind_function_t bind_function_, ...@@ -220,30 +255,44 @@ void test_monitor_versioned_basic (bind_function_t bind_function_,
// TODO why does this use zero_linger? // TODO why does this use zero_linger?
test_context_socket_close_zero_linger (client_mon); test_context_socket_close_zero_linger (client_mon);
test_context_socket_close_zero_linger (server_mon); test_context_socket_close_zero_linger (server_mon);
// Wait for the monitor socket's endpoint to be available
// for reuse.
msleep (SETTLE_TIME);
} }
void test_monitor_versioned_basic_tcp_ipv4 () void test_monitor_versioned_basic_tcp_ipv4 ()
{ {
static const char prefix[] = "tcp://127.0.0.1:"; static const char prefix[] = "tcp://127.0.0.1:";
test_monitor_versioned_basic (bind_loopback_ipv4, prefix); // Calling 'monitor_versioned_typed' with ZMQ_PAIR is the equivalent of
// calling 'monitor_versioned'.
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH);
} }
void test_monitor_versioned_basic_tcp_ipv6 () void test_monitor_versioned_basic_tcp_ipv6 ()
{ {
static const char prefix[] = "tcp://[::1]:"; static const char prefix[] = "tcp://[::1]:";
test_monitor_versioned_basic (bind_loopback_ipv6, prefix); test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH);
} }
void test_monitor_versioned_basic_ipc () void test_monitor_versioned_basic_ipc ()
{ {
static const char prefix[] = "ipc://"; static const char prefix[] = "ipc://";
test_monitor_versioned_basic (bind_loopback_ipc, prefix); test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUSH);
} }
void test_monitor_versioned_basic_tipc () void test_monitor_versioned_basic_tipc ()
{ {
static const char prefix[] = "tipc://"; static const char prefix[] = "tipc://";
test_monitor_versioned_basic (bind_loopback_tipc, prefix); test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PAIR);
test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUB);
test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUSH);
} }
#ifdef ZMQ_EVENT_PIPES_STATS #ifdef ZMQ_EVENT_PIPES_STATS
...@@ -385,6 +434,7 @@ int main () ...@@ -385,6 +434,7 @@ int main ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \ || (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
RUN_TEST (test_monitor_versioned_typed_invalid_socket_type);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
RUN_TEST (test_monitor_versioned_basic_ipc); RUN_TEST (test_monitor_versioned_basic_ipc);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment