Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
2a209140
Commit
2a209140
authored
Sep 25, 2012
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #426 from methodmissing/fix-monitor
Fix monitor
parents
a6c6054e
b1776e23
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
612 additions
and
275 deletions
+612
-275
Makefile.am
doc/Makefile.am
+2
-2
zmq_socket_monitor.txt
doc/zmq_socket_monitor.txt
+132
-67
zmq.h
include/zmq.h
+12
-7
ctx.cpp
src/ctx.cpp
+1
-69
ctx.hpp
src/ctx.hpp
+0
-8
ipc_connecter.cpp
src/ipc_connecter.cpp
+5
-4
ipc_connecter.hpp
src/ipc_connecter.hpp
+3
-0
ipc_listener.cpp
src/ipc_listener.cpp
+5
-5
session_base.cpp
src/session_base.cpp
+2
-11
session_base.hpp
src/session_base.hpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+172
-10
socket_base.hpp
src/socket_base.hpp
+24
-2
stream_engine.cpp
src/stream_engine.cpp
+4
-2
stream_engine.hpp
src/stream_engine.hpp
+4
-0
tcp_connecter.cpp
src/tcp_connecter.cpp
+5
-4
tcp_connecter.hpp
src/tcp_connecter.hpp
+3
-0
tcp_listener.cpp
src/tcp_listener.cpp
+4
-4
zmq.cpp
src/zmq.cpp
+11
-9
test_monitor.cpp
tests/test_monitor.cpp
+221
-69
No files found.
doc/Makefile.am
View file @
2a209140
MAN3
=
zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3
\
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3
\
zmq_init.3 zmq_term.3
zmq_ctx_set_monitor.3
\
zmq_init.3 zmq_term.3
\
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3
\
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3
\
zmq_msg_send.3 zmq_msg_recv.3
\
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3
\
zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3
\
zmq_s
ocket_monitor.3 zmq_s
trerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3
\
zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3
MAN7
=
zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
...
...
doc/zmq_
ctx_s
et_monitor.txt
→
doc/zmq_
sock
et_monitor.txt
View file @
2a209140
zmq_ctx_set_monitor(3)
======================
zmq_ctx_s
ock
et_monitor(3)
======================
===
NAME
----
zmq_
ctx_s
et_monitor - register a monitoring callback
zmq_
sock
et_monitor - register a monitoring callback
SYNOPSIS
--------
*int zmq_
ctx_set_monitor (void '*context', zmq_monitor_fn '*monitor
');*
*int zmq_
socket_monitor (void '*socket', char * '*addr', int 'events
');*
DESCRIPTION
-----------
The _zmq_ctx_set_monitor()_ function shall register a callback function
specified by the 'monitor' argument. This is an event sink for changes in per
socket connection and mailbox (work in progress) states.
The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes
socket state changes (events) over the inproc:// transport to a given endpoint.
Messages are 'zmq_event_t' structs. It's recommended to connect via a 'PAIR'
socket in another application thread and handle monitoring events there. It's
possible to also supply a bitmask ('ZMQ_EVENT_ALL' or any combination of the
'ZMQ_EVENT_*' constants) of the events you're interested in.
.The _zmq_ctx_set_monitor()_ callback function is expected to have this
prototype:
----
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
----
// monitoring thread
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req");
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
// handle socket connected event
break;
case ZMQ_EVENT_CLOSED:
// handle socket closed event
break;
}
}
zmq_close (s);
return NULL;
}
The callback is global (per context), with the socket that triggered the event
passed to the handler as well. Each event also populates a 'zmq_event_data_t'
union with additional metadata which can be used for correlation.
// register a monitor endpoint for all socket events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0);
CAUTION: _zmq_ctx_set_monitor()_ is intended for monitoring infrastructure /
operations concerns only - NOT BUSINESS LOGIC. An event is a representation of
something that happened - you cannot change the past, but only react to them.
The implementation is also only concerned with a single session. No state of
peers, other sessions etc. are tracked - this will only pollute internals and
is the responsibility of application authors to either implement or correlate
in another datastore. Monitor events are exceptional conditions and are thus
not directly in the messaging critical path. However, still be careful with
what you're doing in the callback function as excess time spent in the handler
will block the socket's application thread.
// spawn a monitoring thread
rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx);
assert (rc == 0);
----
Only
tcp and ipc specific transport even
ts are supported in this initial
Only
connection oriented (tcp and ipc) transpor
ts are supported in this initial
implementation.
Supported events are:
...
...
@@ -52,7 +74,7 @@ ZMQ_EVENT_CONNECTED: connection established
The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established
to a remote peer. This can happen either synchronous or asynchronous.
.
Callback
metadata:
.
Event
metadata:
----
data.connected.addr // peer address
data.connected.fd // socket descriptor
...
...
@@ -63,10 +85,10 @@ ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled
The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection
attempt is delayed and it's completion's being polled for.
.
Callback
metadata:
.
Event
metadata:
----
data.connect_delayed.addr // peer address
data.connect_delayed.err // errno
data.connect_delayed.err // errno
value
----
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt
...
...
@@ -75,7 +97,7 @@ The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt
is being handled by reconnect timer. The reconnect interval's recomputed
for each attempt.
.
Callback
metadata:
.
Event
metadata:
----
data.connect_retried.addr // peer address
data.connect_retried.interval // computed reconnect interval
...
...
@@ -86,7 +108,7 @@ ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound
to a an interface.
.
Callback
metadata:
.
Event
metadata:
----
data.listening.addr // listen address
data.listening.fd // socket descriptor
...
...
@@ -97,10 +119,10 @@ ZMQ_EVENT_BIND_FAILED: socket could not bind to an address
The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to
a given interface.
.
Callback
metadata:
.
Event
metadata:
----
data.bind_failed.addr // listen address
data.bind_failed.err // errno
data.bind_failed.err // errno
value
----
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
...
...
@@ -108,7 +130,7 @@ ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer
has been established with a socket's listen address.
.
Callback
metadata:
.
Event
metadata:
----
data.accepted.addr // listen address
data.accepted.fd // socket descriptor
...
...
@@ -119,10 +141,10 @@ ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to
a socket's bound address fails.
.
Callback
metadata:
.
Event
metadata:
----
data.accept_failed.addr // listen address
data.accept_failed.err // errno
data.accept_failed.err // errno
value
----
ZMQ_EVENT_CLOSED: connection closed
...
...
@@ -130,7 +152,7 @@ ZMQ_EVENT_CLOSED: connection closed
The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor
has been closed.
.
Callback
metadata:
.
Event
metadata:
----
data.closed.addr // address
data.closed.fd // socket descriptor
...
...
@@ -141,10 +163,10 @@ ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be
released back to the OS.
.
Callback
metadata:
.
Event
metadata:
----
data.close_failed.addr // address
data.close_failed.err // errno
data.close_failed.err // errno
value
----
ZMQ_EVENT_DISCONNECTED: broken session
...
...
@@ -152,7 +174,7 @@ ZMQ_EVENT_DISCONNECTED: broken session
The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc
specific) detects a corrupted / broken session.
.
Callback
metadata:
.
Event
metadata:
----
data.disconnected.addr // address
data.disconnected.fd // socket descriptor
...
...
@@ -160,56 +182,99 @@ data.disconnected.fd // socket descriptor
RETURN VALUE
------------
The _zmq_
ctx_s
et_monitor()_ function returns a value of 0 or greater if
The _zmq_
sock
et_monitor()_ function returns a value of 0 or greater if
successful. Otherwise it returns `-1` and sets 'errno' to one of the values
defined below.
ERRORS
------
*EINVAL*::
The requested callback function _monitor_ is invalid.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EPROTONOSUPPORT*::
The requested 'transport' protocol is not supported. Monitor sockets are
required to use the inproc:// transport.
*EINVAL*::
The endpoint supplied is invalid.
EXAMPLE
-------
.Observing a '
PUB
' socket's connection state
.Observing a '
REP
' socket's connection state
----
void socket_monitor (void *s, int event_, zmq_event_data_t *data_)
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
switch (event_) {
case ZMQ_EVENT_LISTENING:
printf ("Socket bound to %s, socket descriptor is %d\n",
data.listening.addr, data.listening.fd);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("Accepted connection to %s, socket descriptor is %d\n",
data.accepted.addr, data.accepted.fd);
break;
zmq_event_t event;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_LISTENING:
printf ("listening socket descriptor %d\n", event.data.listening.fd);
printf ("listening socket address %s\n", event.data.listening.addr);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("accepted socket descriptor %d\n", event.data.accepted.fd);
printf ("accepted socket address %s\n", event.data.accepted.addr);
break;
case ZMQ_EVENT_CLOSE_FAILED:
printf ("socket close failure error code %d\n", event.data.close_failed.err);
printf ("socket address %s\n", event.data.close_failed.addr);
break;
case ZMQ_EVENT_CLOSED:
printf ("closed socket descriptor %d\n", event.data.closed.fd);
printf ("closed socket address %s\n", event.data.closed.addr);
break;
case ZMQ_EVENT_DISCONNECTED:
printf ("disconnected socket descriptor %d\n", event.data.disconnected.fd);
printf ("disconnected socket address %s\n", event.data.disconnected.addr);
break;
}
zmq_msg_close (&msg);
}
zmq_close (s);
return NULL;
}
void *context = zmq_ctx_new ();
int rc = zmq_ctx_set_monitor (context, socket_monitor);
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
// REP socket
rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rc == 0);
void *pub = zmq_socket (context, ZMQ_PUB);
assert (pub);
void *sub = zmq_socket (context, ZMQ_SUB);
assert (pub);
rc = zmq_bind (pub, "tcp://127.0.0.1:5560");
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
assert (rc == 0);
rc = zmq_connect (sub, "tcp://127.0.0.1:5560");
rc = zmq_bind (rep, addr);
assert (rc == 0);
// Allow
a window for socket events as connect can be async
// Allow
some time for event detection
zmq_sleep (1);
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0);
zmq_term (c
ontext
);
zmq_term (c
tx
);
----
...
...
include/zmq.h
View file @
2a209140
...
...
@@ -278,8 +278,16 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_EVENT_CLOSE_FAILED 256
#define ZMQ_EVENT_DISCONNECTED 512
#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \
ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \
ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \
ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \
ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED )
/* Socket event data (union member per event) */
typedef
union
{
typedef
struct
{
int
event
;
union
{
struct
{
char
*
addr
;
int
fd
;
...
...
@@ -320,12 +328,8 @@ typedef union {
char
*
addr
;
int
fd
;
}
disconnected
;
}
zmq_event_data_t
;
/* Callback template for socket state changes */
typedef
void
(
zmq_monitor_fn
)
(
void
*
s
,
int
event
,
zmq_event_data_t
*
data
);
ZMQ_EXPORT
int
zmq_ctx_set_monitor
(
void
*
context
,
zmq_monitor_fn
*
monitor
);
}
data
;
}
zmq_event_t
;
ZMQ_EXPORT
void
*
zmq_socket
(
void
*
,
int
type
);
ZMQ_EXPORT
int
zmq_close
(
void
*
s
);
...
...
@@ -339,6 +343,7 @@ ZMQ_EXPORT int zmq_unbind (void *s, const char *addr);
ZMQ_EXPORT
int
zmq_disconnect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_send
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
ZMQ_EXPORT
int
zmq_sendmsg
(
void
*
s
,
zmq_msg_t
*
msg
,
int
flags
);
ZMQ_EXPORT
int
zmq_recvmsg
(
void
*
s
,
zmq_msg_t
*
msg
,
int
flags
);
...
...
src/ctx.cpp
View file @
2a209140
...
...
@@ -45,8 +45,7 @@ zmq::ctx_t::ctx_t () :
slot_count
(
0
),
slots
(
NULL
),
max_sockets
(
ZMQ_MAX_SOCKETS_DFLT
),
io_thread_count
(
ZMQ_IO_THREADS_DFLT
),
monitor_fn
(
NULL
)
io_thread_count
(
ZMQ_IO_THREADS_DFLT
)
{
}
...
...
@@ -126,12 +125,6 @@ int zmq::ctx_t::terminate ()
return
0
;
}
int
zmq
::
ctx_t
::
monitor
(
zmq_monitor_fn
*
monitor_
)
{
monitor_fn
=
monitor_
;
return
0
;
}
int
zmq
::
ctx_t
::
set
(
int
option_
,
int
optval_
)
{
int
rc
=
0
;
...
...
@@ -353,67 +346,6 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return
endpoint
;
}
void
zmq
::
ctx_t
::
monitor_event
(
zmq
::
socket_base_t
*
socket_
,
int
event_
,
...)
{
va_list
args
;
va_start
(
args
,
event_
);
va_monitor_event
(
socket_
,
event_
,
args
);
va_end
(
args
);
}
void
zmq
::
ctx_t
::
va_monitor_event
(
zmq
::
socket_base_t
*
socket_
,
int
event_
,
va_list
args_
)
{
if
(
monitor_fn
!=
NULL
)
{
zmq_event_data_t
data
;
memset
(
&
data
,
0
,
sizeof
(
zmq_event_data_t
));
switch
(
event_
)
{
case
ZMQ_EVENT_CONNECTED
:
data
.
connected
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
connected
.
fd
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_CONNECT_DELAYED
:
data
.
connect_delayed
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
connect_delayed
.
err
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_CONNECT_RETRIED
:
data
.
connect_retried
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
connect_retried
.
interval
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_LISTENING
:
data
.
listening
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
listening
.
fd
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_BIND_FAILED
:
data
.
bind_failed
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
bind_failed
.
err
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_ACCEPTED
:
data
.
accepted
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
accepted
.
fd
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_ACCEPT_FAILED
:
data
.
accept_failed
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
accept_failed
.
err
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_CLOSED
:
data
.
closed
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
closed
.
fd
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_CLOSE_FAILED
:
data
.
close_failed
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
close_failed
.
err
=
va_arg
(
args_
,
int
);
break
;
case
ZMQ_EVENT_DISCONNECTED
:
data
.
disconnected
.
addr
=
va_arg
(
args_
,
char
*
);
data
.
disconnected
.
fd
=
va_arg
(
args_
,
int
);
break
;
default
:
zmq_assert
(
false
);
}
monitor_fn
((
void
*
)
socket_
,
event_
,
&
data
);
}
}
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
...
...
src/ctx.hpp
View file @
2a209140
...
...
@@ -95,11 +95,6 @@ namespace zmq
void
unregister_endpoints
(
zmq
::
socket_base_t
*
socket_
);
endpoint_t
find_endpoint
(
const
char
*
addr_
);
// Monitoring specific
int
monitor
(
zmq_monitor_fn
*
monitor_
);
void
monitor_event
(
zmq
::
socket_base_t
*
socket_
,
int
event_
,
...);
void
va_monitor_event
(
zmq
::
socket_base_t
*
socket_
,
int
event_
,
va_list
args_
);
enum
{
term_tid
=
0
,
reaper_tid
=
1
...
...
@@ -169,9 +164,6 @@ namespace zmq
// Synchronisation of access to context options.
mutex_t
opt_sync
;
// Monitoring callback
zmq_monitor_fn
*
monitor_fn
;
ctx_t
(
const
ctx_t
&
);
const
ctx_t
&
operator
=
(
const
ctx_t
&
);
};
...
...
src/ipc_connecter.cpp
View file @
2a209140
...
...
@@ -56,6 +56,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert
(
addr
);
zmq_assert
(
addr
->
protocol
==
"ipc"
);
addr
->
to_string
(
endpoint
);
socket
=
session
->
get_socket
();
}
zmq
::
ipc_connecter_t
::~
ipc_connecter_t
()
...
...
@@ -121,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down.
terminate
();
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECTED
,
endpoint
.
c_str
(),
fd
);
s
ocket
->
event_connected
(
endpoint
.
c_str
(),
fd
);
}
void
zmq
::
ipc_connecter_t
::
timer_event
(
int
id_
)
...
...
@@ -148,7 +149,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle
=
add_fd
(
s
);
handle_valid
=
true
;
set_pollout
(
handle
);
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECT_DELAYED
,
endpoint
.
c_str
(),
zmq_errno
());
s
ocket
->
event_connect_delayed
(
endpoint
.
c_str
(),
zmq_errno
());
}
// Handle any other error condition by eventual reconnect.
...
...
@@ -163,7 +164,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
{
int
rc_ivl
=
get_new_reconnect_ivl
();
add_timer
(
rc_ivl
,
reconnect_timer_id
);
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECT_RETRIED
,
endpoint
.
c_str
(),
rc_ivl
);
s
ocket
->
event_connect_retried
(
endpoint
.
c_str
(),
rc_ivl
);
timer_started
=
true
;
}
...
...
@@ -224,7 +225,7 @@ int zmq::ipc_connecter_t::close ()
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
s
ession
->
monitor_event
(
ZMQ_EVENT_CLOSED
,
endpoint
.
c_str
(),
s
);
s
ocket
->
event_closed
(
endpoint
.
c_str
(),
s
);
s
=
retired_fd
;
return
0
;
}
...
...
src/ipc_connecter.hpp
View file @
2a209140
...
...
@@ -113,6 +113,9 @@ namespace zmq
// String representation of endpoint to connect to
std
::
string
endpoint
;
// Socket
zmq
::
socket_base_t
*
socket
;
ipc_connecter_t
(
const
ipc_connecter_t
&
);
const
ipc_connecter_t
&
operator
=
(
const
ipc_connecter_t
&
);
};
...
...
src/ipc_listener.cpp
View file @
2a209140
...
...
@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if
(
fd
==
retired_fd
)
{
socket
->
monitor_event
(
ZMQ_EVENT_ACCEPT_FAILED
,
endpoint
.
c_str
(),
zmq_errno
());
socket
->
event_accept_failed
(
endpoint
.
c_str
(),
zmq_errno
());
return
;
}
...
...
@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
session
->
inc_seqnum
();
launch_child
(
session
);
send_attach
(
session
,
engine
,
false
);
socket
->
monitor_event
(
ZMQ_EVENT_ACCEPTED
,
endpoint
.
c_str
(),
fd
);
socket
->
event_accepted
(
endpoint
.
c_str
(),
fd
);
}
int
zmq
::
ipc_listener_t
::
get_address
(
std
::
string
&
addr_
)
...
...
@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if
(
rc
!=
0
)
goto
error
;
socket
->
monitor_event
(
ZMQ_EVENT_LISTENING
,
endpoint
.
c_str
(),
s
);
socket
->
event_listening
(
endpoint
.
c_str
(),
s
);
return
0
;
error
:
...
...
@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
if
(
has_file
&&
!
filename
.
empty
())
{
rc
=
::
unlink
(
filename
.
c_str
());
if
(
rc
!=
0
)
{
socket
->
monitor_event
(
ZMQ_EVENT_CLOSE_FAILED
,
endpoint
.
c_str
(),
zmq_errno
());
socket
->
event_close_failed
(
endpoint
.
c_str
(),
zmq_errno
());
return
-
1
;
}
}
socket
->
monitor_event
(
ZMQ_EVENT_CLOSED
,
endpoint
.
c_str
(),
s
);
socket
->
event_closed
(
endpoint
.
c_str
(),
s
);
return
0
;
}
...
...
src/session_base.cpp
View file @
2a209140
...
...
@@ -23,7 +23,6 @@
#include <stdarg.h>
#include "session_base.hpp"
#include "socket_base.hpp"
#include "i_engine.hpp"
#include "err.hpp"
#include "pipe.hpp"
...
...
@@ -286,17 +285,9 @@ void zmq::session_base_t::hiccuped (pipe_t *)
zmq_assert
(
false
);
}
void
zmq
::
session_base_t
::
monitor_event
(
int
event_
,
...
)
zmq
::
socket_base_t
*
zmq
::
session_base_t
::
get_socket
(
)
{
va_list
args
;
va_start
(
args
,
event_
);
va_monitor_event
(
event_
,
args
);
va_end
(
args
);
}
void
zmq
::
session_base_t
::
va_monitor_event
(
int
event_
,
va_list
args
)
{
socket
->
va_monitor_event
(
event_
,
args
);
return
socket
;
}
void
zmq
::
session_base_t
::
process_plug
()
...
...
src/session_base.hpp
View file @
2a209140
...
...
@@ -31,6 +31,7 @@
#include "pipe.hpp"
#include "i_msg_source.hpp"
#include "i_msg_sink.hpp"
#include "socket_base.hpp"
namespace
zmq
{
...
...
@@ -75,8 +76,7 @@ namespace zmq
void
hiccuped
(
zmq
::
pipe_t
*
pipe_
);
void
terminated
(
zmq
::
pipe_t
*
pipe_
);
void
monitor_event
(
int
event_
,
...);
void
va_monitor_event
(
int
event_
,
va_list
args
);
socket_base_t
*
get_socket
();
protected
:
...
...
src/socket_base.cpp
View file @
2a209140
...
...
@@ -130,13 +130,16 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
destroyed
(
false
),
last_tsc
(
0
),
ticks
(
0
),
rcvmore
(
false
)
rcvmore
(
false
),
monitor_socket
(
NULL
),
monitor_events
(
0
)
{
options
.
socket_id
=
sid_
;
}
zmq
::
socket_base_t
::~
socket_base_t
()
{
stop_monitor
();
zmq_assert
(
destroyed
);
}
...
...
@@ -354,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int
rc
=
listener
->
set_address
(
address
.
c_str
());
if
(
rc
!=
0
)
{
delete
listener
;
monitor_event
(
ZMQ_EVENT_BIND_FAILED
,
addr_
,
zmq_errno
());
event_bind_failed
(
addr_
,
zmq_errno
());
return
-
1
;
}
...
...
@@ -373,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int
rc
=
listener
->
set_address
(
address
.
c_str
());
if
(
rc
!=
0
)
{
delete
listener
;
monitor_event
(
ZMQ_EVENT_BIND_FAILED
,
addr_
,
zmq_errno
());
event_bind_failed
(
addr_
,
zmq_errno
());
return
-
1
;
}
...
...
@@ -845,6 +848,7 @@ void zmq::socket_base_t::process_stop ()
// We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though!
stop_monitor
();
ctx_terminated
=
true
;
}
...
...
@@ -996,15 +1000,172 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
rcvmore
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
}
void
zmq
::
socket_base_t
::
monitor_event
(
int
event_
,
...
)
int
zmq
::
socket_base_t
::
monitor
(
const
char
*
addr_
,
int
events_
)
{
va_list
args
;
va_start
(
args
,
event_
);
va_monitor_event
(
event_
,
args
);
va_end
(
args
);
int
rc
;
if
(
unlikely
(
ctx_terminated
))
{
errno
=
ETERM
;
return
-
1
;
}
// Support deregistering monitoring endpoints as well
if
(
addr_
==
NULL
)
{
stop_monitor
();
return
0
;
}
// Parse addr_ string.
std
::
string
protocol
;
std
::
string
address
;
rc
=
parse_uri
(
addr_
,
protocol
,
address
);
if
(
rc
!=
0
)
return
-
1
;
rc
=
check_protocol
(
protocol
);
if
(
rc
!=
0
)
return
-
1
;
// Event notification only supported over inproc://
if
(
protocol
!=
"inproc"
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
// Register events to monitor
monitor_events
=
events_
;
monitor_socket
=
zmq_socket
(
get_ctx
(),
ZMQ_PAIR
);
if
(
monitor_socket
==
NULL
)
return
-
1
;
// Never block context termination on pending event messages
int
linger
=
0
;
rc
=
zmq_setsockopt
(
monitor_socket
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
if
(
rc
==
-
1
)
stop_monitor
();
// Spawn the monitor socket endpoint
rc
=
zmq_bind
(
monitor_socket
,
addr_
);
if
(
rc
==
-
1
)
stop_monitor
();
return
rc
;
}
void
zmq
::
socket_base_t
::
event_connected
(
const
char
*
addr_
,
int
fd_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_CONNECTED
))
return
;
event
.
event
=
ZMQ_EVENT_CONNECTED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
connected
.
fd
=
fd_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
va_monitor_event
(
int
event_
,
va_list
args
)
void
zmq
::
socket_base_t
::
event_connect_delayed
(
const
char
*
addr_
,
int
err_
)
{
get_ctx
()
->
va_monitor_event
(
this
,
event_
,
args
);
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_CONNECT_DELAYED
))
return
;
event
.
event
=
ZMQ_EVENT_CONNECT_DELAYED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
connect_delayed
.
err
=
err_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_connect_retried
(
const
char
*
addr_
,
int
interval_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_CONNECT_RETRIED
))
return
;
event
.
event
=
ZMQ_EVENT_CONNECT_RETRIED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
connect_retried
.
interval
=
interval_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_listening
(
const
char
*
addr_
,
int
fd_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_LISTENING
))
return
;
event
.
event
=
ZMQ_EVENT_LISTENING
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
listening
.
fd
=
fd_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_bind_failed
(
const
char
*
addr_
,
int
err_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_BIND_FAILED
))
return
;
event
.
event
=
ZMQ_EVENT_BIND_FAILED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
bind_failed
.
err
=
err_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_accepted
(
const
char
*
addr_
,
int
fd_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_ACCEPTED
))
return
;
event
.
event
=
ZMQ_EVENT_ACCEPTED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
accepted
.
fd
=
fd_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_accept_failed
(
const
char
*
addr_
,
int
err_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_ACCEPT_FAILED
))
return
;
event
.
event
=
ZMQ_EVENT_ACCEPT_FAILED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
accept_failed
.
err
=
err_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_closed
(
const
char
*
addr_
,
int
fd_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_CLOSED
))
return
;
event
.
event
=
ZMQ_EVENT_CLOSED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
closed
.
fd
=
fd_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_close_failed
(
const
char
*
addr_
,
int
err_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_CLOSE_FAILED
))
return
;
event
.
event
=
ZMQ_EVENT_CLOSE_FAILED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
close_failed
.
err
=
err_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
event_disconnected
(
const
char
*
addr_
,
int
fd_
)
{
zmq_event_t
event
;
if
(
!
(
monitor_events
&
ZMQ_EVENT_DISCONNECTED
))
return
;
event
.
event
=
ZMQ_EVENT_DISCONNECTED
;
event
.
data
.
connected
.
addr
=
(
char
*
)
addr_
;
event
.
data
.
disconnected
.
fd
=
fd_
;
monitor_event
(
event
);
}
void
zmq
::
socket_base_t
::
monitor_event
(
zmq_event_t
event_
)
{
zmq_msg_t
msg
;
if
(
!
monitor_socket
)
return
;
zmq_msg_init_size
(
&
msg
,
sizeof
(
event_
));
memcpy
(
zmq_msg_data
(
&
msg
),
&
event_
,
sizeof
(
event_
));
zmq_sendmsg
(
monitor_socket
,
&
msg
,
0
);
zmq_msg_close
(
&
msg
);
}
void
zmq
::
socket_base_t
::
stop_monitor
()
{
if
(
monitor_socket
)
{
zmq_close
(
monitor_socket
);
monitor_socket
=
NULL
;
monitor_events
=
0
;
}
}
\ No newline at end of file
src/socket_base.hpp
View file @
2a209140
...
...
@@ -102,8 +102,18 @@ namespace zmq
void
lock
();
void
unlock
();
void
monitor_event
(
int
event_
,
...);
void
va_monitor_event
(
int
event_
,
va_list
args
);
int
monitor
(
const
char
*
endpoint_
,
int
events_
);
void
event_connected
(
const
char
*
addr_
,
int
fd_
);
void
event_connect_delayed
(
const
char
*
addr_
,
int
err_
);
void
event_connect_retried
(
const
char
*
addr_
,
int
interval_
);
void
event_listening
(
const
char
*
addr_
,
int
fd_
);
void
event_bind_failed
(
const
char
*
addr_
,
int
err_
);
void
event_accepted
(
const
char
*
addr_
,
int
fd_
);
void
event_accept_failed
(
const
char
*
addr_
,
int
err_
);
void
event_closed
(
const
char
*
addr_
,
int
fd_
);
void
event_close_failed
(
const
char
*
addr_
,
int
fd_
);
void
event_disconnected
(
const
char
*
addr_
,
int
fd_
);
protected
:
...
...
@@ -138,6 +148,12 @@ namespace zmq
// Delay actual destruction of the socket.
void
process_destroy
();
// Socket event data dispath
void
monitor_event
(
zmq_event_t
data_
);
// Monitor socket cleanup
void
stop_monitor
();
private
:
// Creates new endpoint ID and adds the endpoint to the map.
void
add_endpoint
(
const
char
*
addr_
,
own_t
*
endpoint_
);
...
...
@@ -210,6 +226,12 @@ namespace zmq
// Improves efficiency of time measurement.
clock_t
clock
;
// Monitor socket;
void
*
monitor_socket
;
// Bitmask of events being monitored
int
monitor_events
;
socket_base_t
(
const
socket_base_t
&
);
const
socket_base_t
&
operator
=
(
const
socket_base_t
&
);
mutex_t
sync
;
...
...
src/stream_engine.cpp
View file @
2a209140
...
...
@@ -62,7 +62,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
session
(
NULL
),
options
(
options_
),
endpoint
(
endpoint_
),
plugged
(
false
)
plugged
(
false
),
socket
(
NULL
)
{
// Put the socket into non-blocking mode.
unblock_socket
(
s
);
...
...
@@ -126,6 +127,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
zmq_assert
(
!
session
);
zmq_assert
(
session_
);
session
=
session_
;
socket
=
session
->
get_socket
();
// Connect to I/O threads poller object.
io_object_t
::
plug
(
io_thread_
);
...
...
@@ -445,7 +447,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
void
zmq
::
stream_engine_t
::
error
()
{
zmq_assert
(
session
);
s
ession
->
monitor_event
(
ZMQ_EVENT_DISCONNECTED
,
endpoint
.
c_str
(),
s
);
s
ocket
->
event_disconnected
(
endpoint
.
c_str
(),
s
);
session
->
detach
();
unplug
();
delete
this
;
...
...
src/stream_engine.hpp
View file @
2a209140
...
...
@@ -31,6 +31,7 @@
#include "i_encoder.hpp"
#include "i_decoder.hpp"
#include "options.hpp"
#include "socket_base.hpp"
#include "../include/zmq.h"
namespace
zmq
...
...
@@ -133,6 +134,9 @@ namespace zmq
bool
plugged
;
// Socket
zmq
::
socket_base_t
*
socket
;
stream_engine_t
(
const
stream_engine_t
&
);
const
stream_engine_t
&
operator
=
(
const
stream_engine_t
&
);
};
...
...
src/tcp_connecter.cpp
View file @
2a209140
...
...
@@ -66,6 +66,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq_assert
(
addr
);
zmq_assert
(
addr
->
protocol
==
"tcp"
);
addr
->
to_string
(
endpoint
);
socket
=
session
->
get_socket
();
}
zmq
::
tcp_connecter_t
::~
tcp_connecter_t
()
...
...
@@ -135,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down.
terminate
();
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECTED
,
endpoint
.
c_str
(),
fd
);
s
ocket
->
event_connected
(
endpoint
.
c_str
(),
fd
);
}
void
zmq
::
tcp_connecter_t
::
timer_event
(
int
id_
)
...
...
@@ -162,7 +163,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle
=
add_fd
(
s
);
handle_valid
=
true
;
set_pollout
(
handle
);
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECT_DELAYED
,
endpoint
.
c_str
(),
zmq_errno
());
s
ocket
->
event_connect_delayed
(
endpoint
.
c_str
(),
zmq_errno
());
}
// Handle any other error condition by eventual reconnect.
...
...
@@ -177,7 +178,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
{
int
rc_ivl
=
get_new_reconnect_ivl
();
add_timer
(
rc_ivl
,
reconnect_timer_id
);
s
ession
->
monitor_event
(
ZMQ_EVENT_CONNECT_RETRIED
,
endpoint
.
c_str
(),
rc_ivl
);
s
ocket
->
event_connect_retried
(
endpoint
.
c_str
(),
rc_ivl
);
timer_started
=
true
;
}
...
...
@@ -303,6 +304,6 @@ void zmq::tcp_connecter_t::close ()
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
#endif
s
ession
->
monitor_event
(
ZMQ_EVENT_CLOSED
,
endpoint
.
c_str
(),
s
);
s
ocket
->
event_closed
(
endpoint
.
c_str
(),
s
);
s
=
retired_fd
;
}
src/tcp_connecter.hpp
View file @
2a209140
...
...
@@ -111,6 +111,9 @@ namespace zmq
// String representation of endpoint to connect to
std
::
string
endpoint
;
// Socket
zmq
::
socket_base_t
*
socket
;
tcp_connecter_t
(
const
tcp_connecter_t
&
);
const
tcp_connecter_t
&
operator
=
(
const
tcp_connecter_t
&
);
};
...
...
src/tcp_listener.cpp
View file @
2a209140
...
...
@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if
(
fd
==
retired_fd
)
{
socket
->
monitor_event
(
ZMQ_EVENT_ACCEPT_FAILED
,
endpoint
.
c_str
(),
zmq_errno
());
socket
->
event_accept_failed
(
endpoint
.
c_str
(),
zmq_errno
());
return
;
}
...
...
@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session
->
inc_seqnum
();
launch_child
(
session
);
send_attach
(
session
,
engine
,
false
);
socket
->
monitor_event
(
ZMQ_EVENT_ACCEPTED
,
endpoint
.
c_str
(),
fd
);
socket
->
event_accepted
(
endpoint
.
c_str
(),
fd
);
}
void
zmq
::
tcp_listener_t
::
close
()
...
...
@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
#endif
socket
->
monitor_event
(
ZMQ_EVENT_CLOSED
,
endpoint
.
c_str
(),
s
);
socket
->
event_closed
(
endpoint
.
c_str
(),
s
);
s
=
retired_fd
;
}
...
...
@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto
error
;
#endif
socket
->
monitor_event
(
ZMQ_EVENT_LISTENING
,
endpoint
.
c_str
(),
s
);
socket
->
event_listening
(
endpoint
.
c_str
(),
s
);
return
0
;
error
:
...
...
src/zmq.cpp
View file @
2a209140
...
...
@@ -210,15 +210,6 @@ int zmq_ctx_get (void *ctx_, int option_)
return
((
zmq
::
ctx_t
*
)
ctx_
)
->
get
(
option_
);
}
int
zmq_ctx_set_monitor
(
void
*
ctx_
,
zmq_monitor_fn
*
monitor_
)
{
if
(
!
ctx_
||
!
((
zmq
::
ctx_t
*
)
ctx_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
ctx_t
*
)
ctx_
)
->
monitor
(
monitor_
);
}
// Stable/legacy context API
void
*
zmq_init
(
int
io_threads_
)
...
...
@@ -284,6 +275,17 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return
result
;
}
int
zmq_socket_monitor
(
void
*
s_
,
const
char
*
addr_
,
int
events_
)
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
monitor
(
addr_
,
events_
);
return
result
;
}
int
zmq_bind
(
void
*
s_
,
const
char
*
addr_
)
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
...
...
tests/test_monitor.cpp
View file @
2a209140
...
...
@@ -21,89 +21,226 @@
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <pthread.h>
#include <string.h>
#include "testutil.hpp"
static
int
events
;
// REQ socket events handled
static
int
req_socket_events
;
// 2nd REQ socket events handled
static
int
req2_socket_events
;
// REP socket events handled
static
int
rep_socket_events
;
typedef
void
*
ZmqSocket
;
ZmqSocket
rep
,
req
;
const
char
*
addr
;
void
socket_monitor
(
ZmqSocket
s
,
int
event_
,
zmq_event_data_t
*
data_
)
extern
"C"
{
assert
(
s
==
rep
||
s
==
req
);
const
char
*
addr
=
"tcp://127.0.0.1:5560"
;
// Only some of the exceptional events could fire
switch
(
event_
)
{
// listener specific
case
ZMQ_EVENT_LISTENING
:
assert
(
s
==
rep
);
assert
(
data_
->
listening
.
fd
>
0
);
assert
(
!
strcmp
(
data_
->
listening
.
addr
,
addr
));
events
|=
ZMQ_EVENT_LISTENING
;
break
;
case
ZMQ_EVENT_ACCEPTED
:
assert
(
s
==
rep
);
assert
(
data_
->
accepted
.
fd
>
0
);
assert
(
!
strcmp
(
data_
->
accepted
.
addr
,
addr
));
events
|=
ZMQ_EVENT_ACCEPTED
;
break
;
// connecter specific
case
ZMQ_EVENT_CONNECTED
:
assert
(
s
==
req
);
assert
(
data_
->
connected
.
fd
>
0
);
assert
(
!
strcmp
(
data_
->
connected
.
addr
,
addr
));
events
|=
ZMQ_EVENT_CONNECTED
;
break
;
case
ZMQ_EVENT_CONNECT_DELAYED
:
assert
(
s
==
req
);
assert
(
data_
->
connect_delayed
.
err
!=
0
);
assert
(
!
strcmp
(
data_
->
connect_delayed
.
addr
,
addr
));
events
|=
ZMQ_EVENT_CONNECT_DELAYED
;
break
;
// generic - either end of the socket
case
ZMQ_EVENT_CLOSE_FAILED
:
assert
(
data_
->
close_failed
.
err
!=
0
);
assert
(
!
strcmp
(
data_
->
close_failed
.
addr
,
addr
));
events
|=
ZMQ_EVENT_CLOSE_FAILED
;
break
;
case
ZMQ_EVENT_CLOSED
:
assert
(
data_
->
closed
.
fd
!=
0
);
assert
(
!
strcmp
(
data_
->
closed
.
addr
,
addr
));
events
|=
ZMQ_EVENT_CLOSED
;
break
;
case
ZMQ_EVENT_DISCONNECTED
:
assert
(
data_
->
disconnected
.
fd
!=
0
);
assert
(
!
strcmp
(
data_
->
disconnected
.
addr
,
addr
));
events
|=
ZMQ_EVENT_DISCONNECTED
;
break
;
default:
// out of band / unexpected event
assert
(
0
);
// REQ socket monitor thread
static
void
*
req_socket_monitor
(
void
*
ctx
)
{
zmq_event_t
event
;
int
rc
;
void
*
s
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
s
);
rc
=
zmq_connect
(
s
,
"inproc://monitor.req"
);
assert
(
rc
==
0
);
while
(
true
)
{
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
rc
=
zmq_recvmsg
(
s
,
&
msg
,
0
);
if
(
rc
==
-
1
&&
zmq_errno
()
==
ETERM
)
break
;
assert
(
rc
!=
-
1
);
memcpy
(
&
event
,
zmq_msg_data
(
&
msg
),
sizeof
(
event
));
switch
(
event
.
event
)
{
case
ZMQ_EVENT_CONNECTED
:
assert
(
event
.
data
.
connected
.
fd
>
0
);
assert
(
!
strcmp
(
event
.
data
.
connected
.
addr
,
addr
));
req_socket_events
|=
ZMQ_EVENT_CONNECTED
;
req2_socket_events
|=
ZMQ_EVENT_CONNECTED
;
break
;
case
ZMQ_EVENT_CONNECT_DELAYED
:
assert
(
event
.
data
.
connect_delayed
.
err
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
connect_delayed
.
addr
,
addr
));
req_socket_events
|=
ZMQ_EVENT_CONNECT_DELAYED
;
break
;
case
ZMQ_EVENT_CLOSE_FAILED
:
assert
(
event
.
data
.
close_failed
.
err
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
close_failed
.
addr
,
addr
));
req_socket_events
|=
ZMQ_EVENT_CLOSE_FAILED
;
break
;
case
ZMQ_EVENT_CLOSED
:
assert
(
event
.
data
.
closed
.
fd
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
closed
.
addr
,
addr
));
req_socket_events
|=
ZMQ_EVENT_CLOSED
;
break
;
case
ZMQ_EVENT_DISCONNECTED
:
assert
(
event
.
data
.
disconnected
.
fd
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
disconnected
.
addr
,
addr
));
req_socket_events
|=
ZMQ_EVENT_DISCONNECTED
;
break
;
}
}
zmq_close
(
s
);
return
NULL
;
}
}
extern
"C"
{
// 2nd REQ socket monitor thread
static
void
*
req2_socket_monitor
(
void
*
ctx
)
{
zmq_event_t
event
;
int
rc
;
void
*
s
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
s
);
rc
=
zmq_connect
(
s
,
"inproc://monitor.req2"
);
assert
(
rc
==
0
);
while
(
true
)
{
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
rc
=
zmq_recvmsg
(
s
,
&
msg
,
0
);
if
(
rc
==
-
1
&&
zmq_errno
()
==
ETERM
)
break
;
assert
(
rc
!=
-
1
);
memcpy
(
&
event
,
zmq_msg_data
(
&
msg
),
sizeof
(
event
));
switch
(
event
.
event
)
{
case
ZMQ_EVENT_CONNECTED
:
assert
(
event
.
data
.
connected
.
fd
>
0
);
assert
(
!
strcmp
(
event
.
data
.
connected
.
addr
,
addr
));
req2_socket_events
|=
ZMQ_EVENT_CONNECTED
;
break
;
case
ZMQ_EVENT_CLOSED
:
assert
(
event
.
data
.
closed
.
fd
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
closed
.
addr
,
addr
));
req2_socket_events
|=
ZMQ_EVENT_CLOSED
;
break
;
}
}
zmq_close
(
s
);
return
NULL
;
}
}
extern
"C"
{
// REP socket monitor thread
static
void
*
rep_socket_monitor
(
void
*
ctx
)
{
zmq_event_t
event
;
int
rc
;
void
*
s
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
s
);
rc
=
zmq_connect
(
s
,
"inproc://monitor.rep"
);
assert
(
rc
==
0
);
while
(
true
)
{
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
rc
=
zmq_recvmsg
(
s
,
&
msg
,
0
);
if
(
rc
==
-
1
&&
zmq_errno
()
==
ETERM
)
break
;
assert
(
rc
!=
-
1
);
memcpy
(
&
event
,
zmq_msg_data
(
&
msg
),
sizeof
(
event
));
switch
(
event
.
event
)
{
case
ZMQ_EVENT_LISTENING
:
assert
(
event
.
data
.
listening
.
fd
>
0
);
assert
(
!
strcmp
(
event
.
data
.
listening
.
addr
,
addr
));
rep_socket_events
|=
ZMQ_EVENT_LISTENING
;
break
;
case
ZMQ_EVENT_ACCEPTED
:
assert
(
event
.
data
.
accepted
.
fd
>
0
);
assert
(
!
strcmp
(
event
.
data
.
accepted
.
addr
,
addr
));
rep_socket_events
|=
ZMQ_EVENT_ACCEPTED
;
break
;
case
ZMQ_EVENT_CLOSE_FAILED
:
assert
(
event
.
data
.
close_failed
.
err
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
close_failed
.
addr
,
addr
));
rep_socket_events
|=
ZMQ_EVENT_CLOSE_FAILED
;
break
;
case
ZMQ_EVENT_CLOSED
:
assert
(
event
.
data
.
closed
.
fd
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
closed
.
addr
,
addr
));
rep_socket_events
|=
ZMQ_EVENT_CLOSED
;
break
;
case
ZMQ_EVENT_DISCONNECTED
:
assert
(
event
.
data
.
disconnected
.
fd
!=
0
);
assert
(
!
strcmp
(
event
.
data
.
disconnected
.
addr
,
addr
));
rep_socket_events
|=
ZMQ_EVENT_DISCONNECTED
;
break
;
}
zmq_msg_close
(
&
msg
);
}
zmq_close
(
s
);
return
NULL
;
}
}
int
main
(
void
)
{
int
rc
;
void
*
req
;
void
*
req2
;
void
*
rep
;
pthread_t
threads
[
3
];
addr
=
"tcp://127.0.0.1:5560"
;
// Create the infrastructure
void
*
ctx
=
zmq_init
(
1
);
assert
(
ctx
);
// set socket monitor
rc
=
zmq_ctx_set_monitor
(
ctx
,
socket_monitor
);
assert
(
rc
==
0
);
// REP socket
rep
=
zmq_socket
(
ctx
,
ZMQ_REP
);
assert
(
rep
);
rc
=
zmq_bind
(
rep
,
"tcp://127.0.0.1:5560"
);
// Assert supported protocols
rc
=
zmq_socket_monitor
(
rep
,
addr
,
0
);
assert
(
rc
==
-
1
);
assert
(
zmq_errno
()
==
EPROTONOSUPPORT
);
// Deregister monitor
rc
=
zmq_socket_monitor
(
rep
,
NULL
,
0
);
assert
(
rc
==
0
);
// REP socket monitor, all events
rc
=
zmq_socket_monitor
(
rep
,
"inproc://monitor.rep"
,
ZMQ_EVENT_ALL
);
assert
(
rc
==
0
);
rc
=
pthread_create
(
&
threads
[
0
],
NULL
,
rep_socket_monitor
,
ctx
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
rep
,
addr
);
assert
(
rc
==
0
);
// REQ socket
req
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
req
);
rc
=
zmq_connect
(
req
,
"tcp://127.0.0.1:5560"
);
// REQ socket monitor, all events
rc
=
zmq_socket_monitor
(
req
,
"inproc://monitor.req"
,
ZMQ_EVENT_ALL
);
assert
(
rc
==
0
);
rc
=
pthread_create
(
&
threads
[
1
],
NULL
,
req_socket_monitor
,
ctx
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
req
,
addr
);
assert
(
rc
==
0
);
// 2nd REQ socket
req2
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
req2
);
// 2nd REQ socket monitor, connected event only
rc
=
zmq_socket_monitor
(
req2
,
"inproc://monitor.req2"
,
ZMQ_EVENT_CONNECTED
);
assert
(
rc
==
0
);
rc
=
pthread_create
(
&
threads
[
2
],
NULL
,
req2_socket_monitor
,
ctx
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
req2
,
addr
);
assert
(
rc
==
0
);
bounce
(
rep
,
req
);
...
...
@@ -111,26 +248,41 @@ int main (void)
// Allow a window for socket events as connect can be async
zmq_sleep
(
1
);
//
Deallocate the infrastructure.
rc
=
zmq_close
(
re
q
);
//
Close the REP socket
rc
=
zmq_close
(
re
p
);
assert
(
rc
==
0
);
// Allow
for closed or disconnected events to bubble up
// Allow
some time for detecting error states
zmq_sleep
(
1
);
rc
=
zmq_close
(
rep
);
// Close the REQ socket
rc
=
zmq_close
(
req
);
assert
(
rc
==
0
);
// Close the 2nd REQ socket
rc
=
zmq_close
(
req2
);
assert
(
rc
==
0
);
// Allow for closed or disconnected events to bubble up
zmq_sleep
(
1
);
zmq_term
(
ctx
);
// We expect to at least observe these events
assert
(
events
&
ZMQ_EVENT_LISTENING
);
assert
(
events
&
ZMQ_EVENT_ACCEPTED
);
assert
(
events
&
ZMQ_EVENT_CONNECTED
);
assert
(
events
&
ZMQ_EVENT_CLOSED
);
assert
(
events
&
ZMQ_EVENT_DISCONNECTED
);
// Expected REP socket events
assert
(
rep_socket_events
&
ZMQ_EVENT_LISTENING
);
assert
(
rep_socket_events
&
ZMQ_EVENT_ACCEPTED
);
assert
(
rep_socket_events
&
ZMQ_EVENT_CLOSED
);
// Expected REQ socket events
assert
(
req_socket_events
&
ZMQ_EVENT_CONNECTED
);
assert
(
req_socket_events
&
ZMQ_EVENT_DISCONNECTED
);
assert
(
req_socket_events
&
ZMQ_EVENT_CLOSED
);
// Expected 2nd REQ socket events
assert
(
req2_socket_events
&
ZMQ_EVENT_CONNECTED
);
assert
(
!
(
req2_socket_events
&
ZMQ_EVENT_CLOSED
));
pthread_exit
(
NULL
);
return
0
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment