Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3aca047b
Unverified
Commit
3aca047b
authored
May 15, 2019
by
Simon Giesecke
Committed by
GitHub
May 15, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3508 from jean-airoldie/monitor_typed
Problem: Socket monitoring only allows ZMQ_PAIR
parents
e285fe6e
606a8f79
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
128 additions
and
16 deletions
+128
-16
zmq_socket_monitor_versioned.txt
doc/zmq_socket_monitor_versioned.txt
+27
-0
zmq.h
include/zmq.h
+2
-0
socket_base.cpp
src/socket_base.cpp
+20
-2
socket_base.hpp
src/socket_base.hpp
+4
-1
zmq.cpp
src/zmq.cpp
+11
-1
zmq_draft.h
src/zmq_draft.h
+2
-0
test_monitor.cpp
tests/test_monitor.cpp
+62
-12
No files found.
doc/zmq_socket_monitor_versioned.txt
View file @
3aca047b
...
...
@@ -11,6 +11,8 @@ zmq_socket_monitor_versioned - monitor socket events
SYNOPSIS
--------
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
*int zmq_socket_monitor_versioned_typed (
void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');*
*int zmq_socket_monitor_pipes_stats (void '*socket');*
...
...
@@ -56,6 +58,17 @@ connection uses a bound or connected local endpoint.
Note that the format of the second and further frames, and also the number of
frames, may be different for events added in the future.
The _zmq_socket_monitor_versioned_typed()_ is a generalisation of
_zmq_socket_monitor_versioned_ that supports more monitoring socket types.
The 'type' argument is used to specify the type of the monitoring socket.
Supported types are 'ZMQ_PAIR' (which is the equivalent of
_zmq_socket_monitor_versioned_), 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers
of the events will have to be compatible with the socket type, for instance a
monitoring socket of type 'ZMQ_PUB' will require consumers of type 'ZMQ_SUB'.
In the case that the monitoring socket type is of 'ZMQ_PUB', the multipart
message topic is the event number, thus consumers should subscribe to the
events they want to receive.
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
...
...
@@ -215,6 +228,20 @@ sockets are required to use the inproc:// transport.
The monitor 'endpoint' supplied does not exist.
ERRORS - _zmq_socket_monitor_typed()_
-------------------------------
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EPROTONOSUPPORT*::
The transport protocol of the monitor 'endpoint' is not supported. Monitor
sockets are required to use the inproc:// transport.
*EINVAL*::
The monitor 'endpoint' supplied does not exist or the specified socket 'type'
is not supported.
ERRORS - _zmq_socket_monitor_pipes_stats()_
-------------------------------------------
*ENOTSOCK*::
...
...
include/zmq.h
View file @
3aca047b
...
...
@@ -735,6 +735,8 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const
char
*
addr_
,
uint64_t
events_
,
int
event_version_
);
ZMQ_EXPORT
int
zmq_socket_monitor_versioned_typed
(
void
*
s_
,
const
char
*
addr_
,
uint64_t
events_
,
int
event_version_
,
int
type_
);
ZMQ_EXPORT
int
zmq_socket_monitor_pipes_stats
(
void
*
s
);
#endif // ZMQ_BUILD_DRAFT_API
...
...
src/socket_base.cpp
View file @
3aca047b
...
...
@@ -1639,7 +1639,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
int
zmq
::
socket_base_t
::
monitor
(
const
char
*
endpoint_
,
uint64_t
events_
,
int
event_version_
)
int
event_version_
,
int
type_
)
{
scoped_lock_t
lock
(
_monitor_sync
);
...
...
@@ -1670,14 +1671,31 @@ int zmq::socket_base_t::monitor (const char *endpoint_,
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
// already monitoring. Stop previous monitor before starting new one.
if
(
_monitor_socket
!=
NULL
)
{
stop_monitor
(
true
);
}
// Check if the specified socket type is supported. It must be a
// one-way socket types that support the SNDMORE flag.
switch
(
type_
)
{
case
ZMQ_PAIR
:
break
;
case
ZMQ_PUB
:
break
;
case
ZMQ_PUSH
:
break
;
default
:
errno
=
EINVAL
;
return
-
1
;
}
// Register events to monitor
_monitor_events
=
events_
;
options
.
monitor_event_version
=
event_version_
;
_monitor_socket
=
zmq_socket
(
get_ctx
(),
ZMQ_PAIR
);
// Create a monitor socket of the specified type.
_monitor_socket
=
zmq_socket
(
get_ctx
(),
type_
);
if
(
_monitor_socket
==
NULL
)
return
-
1
;
...
...
src/socket_base.hpp
View file @
3aca047b
...
...
@@ -119,7 +119,10 @@ class socket_base_t : public own_t,
void
lock
();
void
unlock
();
int
monitor
(
const
char
*
endpoint_
,
uint64_t
events_
,
int
event_version_
);
int
monitor
(
const
char
*
endpoint_
,
uint64_t
events_
,
int
event_version_
,
int
type_
);
void
event_connected
(
const
endpoint_uri_pair_t
&
endpoint_uri_pair_
,
zmq
::
fd_t
fd_
);
...
...
src/zmq.cpp
View file @
3aca047b
...
...
@@ -275,7 +275,7 @@ int zmq_socket_monitor_versioned (void *s_,
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
if
(
!
s
)
return
-
1
;
return
s
->
monitor
(
addr_
,
events_
,
event_version_
);
return
s
->
monitor
(
addr_
,
events_
,
event_version_
,
ZMQ_PAIR
);
}
int
zmq_socket_monitor
(
void
*
s_
,
const
char
*
addr_
,
int
events_
)
...
...
@@ -283,6 +283,16 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_)
return
zmq_socket_monitor_versioned
(
s_
,
addr_
,
events_
,
1
);
}
int
zmq_socket_monitor_versioned_typed
(
void
*
s_
,
const
char
*
addr_
,
uint64_t
events_
,
int
event_version_
,
int
type_
)
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
if
(
!
s
)
return
-
1
;
return
s
->
monitor
(
addr_
,
events_
,
event_version_
,
type_
);
}
int
zmq_join
(
void
*
s_
,
const
char
*
group_
)
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
...
...
src/zmq_draft.h
View file @
3aca047b
...
...
@@ -133,6 +133,8 @@ int zmq_socket_monitor_versioned (void *s_,
const
char
*
addr_
,
uint64_t
events_
,
int
event_version_
);
int
zmq_socket_monitor_versioned_typed
(
void
*
s_
,
const
char
*
addr_
,
uint64_t
events_
,
int
event_version_
,
int
type_
);
int
zmq_socket_monitor_pipes_stats
(
void
*
s_
);
#endif // ZMQ_BUILD_DRAFT_API
...
...
tests/test_monitor.cpp
View file @
3aca047b
...
...
@@ -126,8 +126,21 @@ void test_monitor_basic ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
void
test_monitor_versioned_basic
(
bind_function_t
bind_function_
,
const
char
*
expected_prefix_
)
void
test_monitor_versioned_typed_invalid_socket_type
()
{
void
*
client
=
test_context_socket
(
ZMQ_DEALER
);
// Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
TEST_ASSERT_FAILURE_ERRNO
(
EINVAL
,
zmq_socket_monitor_versioned_typed
(
client
,
"inproc://invalid-socket-type"
,
0
,
2
,
ZMQ_CLIENT
));
test_context_socket_close_zero_linger
(
client
);
}
void
test_monitor_versioned_typed_basic
(
bind_function_t
bind_function_
,
const
char
*
expected_prefix_
,
int
type_
)
{
char
server_endpoint
[
MAX_SOCKET_STRING
];
...
...
@@ -136,14 +149,36 @@ void test_monitor_versioned_basic (bind_function_t bind_function_,
void
*
server
=
test_context_socket
(
ZMQ_DEALER
);
// Monitor all events on client and server sockets
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_socket_monitor_versioned
(
client
,
"inproc://monitor-client"
,
ZMQ_EVENT_ALL_V2
,
2
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_socket_monitor_versioned
(
server
,
"inproc://monitor-server"
,
ZMQ_EVENT_ALL_V2
,
2
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_socket_monitor_versioned_typed
(
client
,
"inproc://monitor-client"
,
ZMQ_EVENT_ALL_V2
,
2
,
type_
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_socket_monitor_versioned_typed
(
server
,
"inproc://monitor-server"
,
ZMQ_EVENT_ALL_V2
,
2
,
type_
));
// Choose the appropriate consumer socket type.
int
mon_type
;
switch
(
type_
)
{
case
ZMQ_PAIR
:
mon_type
=
ZMQ_PAIR
;
break
;
case
ZMQ_PUSH
:
mon_type
=
ZMQ_PULL
;
break
;
case
ZMQ_PUB
:
mon_type
=
ZMQ_SUB
;
break
;
}
// Create two sockets for collecting monitor events
void
*
client_mon
=
test_context_socket
(
ZMQ_PAIR
);
void
*
server_mon
=
test_context_socket
(
ZMQ_PAIR
);
void
*
client_mon
=
test_context_socket
(
mon_type
);
void
*
server_mon
=
test_context_socket
(
mon_type
);
// Additionally subscribe to all events if a PUB socket is used.
if
(
type_
==
ZMQ_PUB
)
{
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
client_mon
,
ZMQ_SUBSCRIBE
,
""
,
0
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
server_mon
,
ZMQ_SUBSCRIBE
,
""
,
0
));
}
// Connect these to the inproc endpoints so they'll get events
TEST_ASSERT_SUCCESS_ERRNO
(
...
...
@@ -220,30 +255,44 @@ void test_monitor_versioned_basic (bind_function_t bind_function_,
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger
(
client_mon
);
test_context_socket_close_zero_linger
(
server_mon
);
// Wait for the monitor socket's endpoint to be available
// for reuse.
msleep
(
SETTLE_TIME
);
}
void
test_monitor_versioned_basic_tcp_ipv4
()
{
static
const
char
prefix
[]
=
"tcp://127.0.0.1:"
;
test_monitor_versioned_basic
(
bind_loopback_ipv4
,
prefix
);
// Calling 'monitor_versioned_typed' with ZMQ_PAIR is the equivalent of
// calling 'monitor_versioned'.
test_monitor_versioned_typed_basic
(
bind_loopback_ipv4
,
prefix
,
ZMQ_PAIR
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipv4
,
prefix
,
ZMQ_PUB
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipv4
,
prefix
,
ZMQ_PUSH
);
}
void
test_monitor_versioned_basic_tcp_ipv6
()
{
static
const
char
prefix
[]
=
"tcp://[::1]:"
;
test_monitor_versioned_basic
(
bind_loopback_ipv6
,
prefix
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipv6
,
prefix
,
ZMQ_PAIR
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipv6
,
prefix
,
ZMQ_PUB
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipv6
,
prefix
,
ZMQ_PUSH
);
}
void
test_monitor_versioned_basic_ipc
()
{
static
const
char
prefix
[]
=
"ipc://"
;
test_monitor_versioned_basic
(
bind_loopback_ipc
,
prefix
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipc
,
prefix
,
ZMQ_PAIR
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipc
,
prefix
,
ZMQ_PUB
);
test_monitor_versioned_typed_basic
(
bind_loopback_ipc
,
prefix
,
ZMQ_PUSH
);
}
void
test_monitor_versioned_basic_tipc
()
{
static
const
char
prefix
[]
=
"tipc://"
;
test_monitor_versioned_basic
(
bind_loopback_tipc
,
prefix
);
test_monitor_versioned_typed_basic
(
bind_loopback_tipc
,
prefix
,
ZMQ_PAIR
);
test_monitor_versioned_typed_basic
(
bind_loopback_tipc
,
prefix
,
ZMQ_PUB
);
test_monitor_versioned_typed_basic
(
bind_loopback_tipc
,
prefix
,
ZMQ_PUSH
);
}
#ifdef ZMQ_EVENT_PIPES_STATS
...
...
@@ -385,6 +434,7 @@ int main ()
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
RUN_TEST
(
test_monitor_versioned_typed_invalid_socket_type
);
RUN_TEST
(
test_monitor_versioned_basic_tcp_ipv4
);
RUN_TEST
(
test_monitor_versioned_basic_tcp_ipv6
);
RUN_TEST
(
test_monitor_versioned_basic_ipc
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment