Commit 1810f102 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #529 from guidog/master

Adapted zmq_socket_monitor man page to new API.
parents ba2dda40 d78d4972
...@@ -17,168 +17,100 @@ DESCRIPTION ...@@ -17,168 +17,100 @@ DESCRIPTION
----------- -----------
The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes
socket state changes (events) over the inproc:// transport to a given endpoint. socket state changes (events) over the inproc:// transport to a given endpoint.
Messages are 'zmq_event_t' structs. It's recommended to connect via a 'PAIR'
socket in another application thread and handle monitoring events there. It's
possible to also supply a bitmask ('ZMQ_EVENT_ALL' or any combination of the
'ZMQ_EVENT_*' constants) of the events you're interested in.
---- Messages consist of 2 Frames, the first containing the event-id and the
// monitoring thread associated value. The second frame holds the affected endpoint as string.
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR); The layout of the first Frame is:
assert (s); 16 bit event id
32 bit event value
rc = zmq_connect (s, "inproc://monitor.req"); event id and value are in the native byte order (for the machine the
assert (rc == 0); application is running on). There is no padding between the fields.
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_recv (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
// handle socket connected event
break;
case ZMQ_EVENT_CLOSED:
// handle socket closed event
break;
}
}
zmq_close (s);
return NULL;
}
// register a monitor endpoint for all socket events The event value has to be interpreted in the context of the event id.
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); See 'Supported events' below for details.
assert (rc == 0);
// spawn a monitoring thread
rc = pthread_create (&threads [0], NULL, req_socket_monitor, ctx);
assert (rc == 0);
---- ----
Only connection oriented (tcp and ipc) transports are supported in this initial Only connection oriented (tcp and ipc) transports are supported in this initial
implementation. implementation.
Supported events are: ----
Supported events
----------------
ZMQ_EVENT_CONNECTED: connection established ZMQ_EVENT_CONNECTED: connection established
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established
to a remote peer. This can happen either synchronous or asynchronous. to a remote peer. This can happen either synchronous or asynchronous.
value is the FD of the newly connected socket.
.Event metadata:
----
data.connected.addr // peer address
data.connected.fd // socket descriptor
----
ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection
attempt is delayed and it's completion's being polled for. attempt is delayed and it's completion's being polled for.
value has no meaning.
.Event metadata:
----
data.connect_delayed.addr // peer address
data.connect_delayed.err // errno value
----
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt
is being handled by reconnect timer. The reconnect interval's recomputed is being handled by reconnect timer. The reconnect interval's recomputed
for each attempt. for each attempt.
value is the reconnect interval.
.Event metadata:
----
data.connect_retried.addr // peer address
data.connect_retried.interval // computed reconnect interval
----
ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound
to a an interface. to a an interface.
value is the FD of the newly bound socket.
.Event metadata:
----
data.listening.addr // listen address
data.listening.fd // socket descriptor
----
ZMQ_EVENT_BIND_FAILED: socket could not bind to an address ZMQ_EVENT_BIND_FAILED: socket could not bind to an address
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to
a given interface. a given interface.
value is the errno generated by the bind call.
.Event metadata:
----
data.bind_failed.addr // listen address
data.bind_failed.err // errno value
----
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer
has been established with a socket's listen address. has been established with a socket's listen address.
value is the FD of the accepted socket.
.Event metadata:
----
data.accepted.addr // listen address
data.accepted.fd // socket descriptor
----
ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to
a socket's bound address fails. a socket's bound address fails.
value is the errno generated by accept.
.Event metadata:
----
data.accept_failed.addr // listen address
data.accept_failed.err // errno value
----
ZMQ_EVENT_CLOSED: connection closed ZMQ_EVENT_CLOSED: connection closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor
has been closed. has been closed.
value is the former FD of the for the closed socket. FD has been closed already!
.Event metadata:
----
data.closed.addr // address
data.closed.fd // socket descriptor
----
ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be
released back to the OS. released back to the OS. Implementation note: ONLY FOR IPC SOCKETS.
value is the errno generated by unlink.
.Event metadata:
----
data.close_failed.addr // address
data.close_failed.err // errno value
----
ZMQ_EVENT_DISCONNECTED: broken session ZMQ_EVENT_DISCONNECTED: broken session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc
specific) detects a corrupted / broken session. specific) detects a corrupted / broken session.
value is the FD of the socket.
.Event metadata:
----
data.disconnected.addr // address
data.disconnected.fd // socket descriptor
----
RETURN VALUE RETURN VALUE
------------ ------------
...@@ -203,78 +135,116 @@ EXAMPLE ...@@ -203,78 +135,116 @@ EXAMPLE
------- -------
.Observing a 'REP' socket's connection state .Observing a 'REP' socket's connection state
---- ----
#include <stdio.h>
#include <zmq.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>
static int read_msg(void* s, zmq_event_t* event, char* ep)
{
int rc ;
zmq_msg_t msg1; // binary part
zmq_msg_init (&msg1);
zmq_msg_t msg2; // address part
zmq_msg_init (&msg2);
rc = zmq_msg_recv (&msg1, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return 1 ;
assert (rc != -1);
assert (zmq_msg_more(&msg1) != 0);
rc = zmq_msg_recv (&msg2, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return 1;
assert (rc != -1);
assert (zmq_msg_more(&msg2) == 0);
// copy binary data to event struct
const char* data = (char*)zmq_msg_data(&msg1);
memcpy(&(event->event), data, sizeof(event->event));
memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value));
// copy address part
const size_t len = zmq_msg_size(&msg2) ;
ep = memcpy(ep, zmq_msg_data(&msg2), len);
*(ep + len) = 0 ;
return 0 ;
}
// REP socket monitor thread // REP socket monitor thread
static void *rep_socket_monitor (void *ctx) static void *rep_socket_monitor (void *ctx)
{ {
zmq_event_t event; zmq_event_t event;
static char addr[1025] ;
int rc; int rc;
printf("starting monitor...\n");
void *s = zmq_socket (ctx, ZMQ_PAIR); void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s); assert (s);
rc = zmq_connect (s, "inproc://monitor.rep"); rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0); assert (rc == 0);
while (true) { while (!read_msg(s, &event, addr)) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_recv (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_LISTENING: case ZMQ_EVENT_LISTENING:
printf ("listening socket descriptor %d\n", event.data.listening.fd); printf ("listening socket descriptor %d\n", event.value);
printf ("listening socket address %s\n", event.data.listening.addr); printf ("listening socket address %s\n", addr);
break; break;
case ZMQ_EVENT_ACCEPTED: case ZMQ_EVENT_ACCEPTED:
printf ("accepted socket descriptor %d\n", event.data.accepted.fd); printf ("accepted socket descriptor %d\n", event.value);
printf ("accepted socket address %s\n", event.data.accepted.addr); printf ("accepted socket address %s\n", addr);
break; break;
case ZMQ_EVENT_CLOSE_FAILED: case ZMQ_EVENT_CLOSE_FAILED:
printf ("socket close failure error code %d\n", event.data.close_failed.err); printf ("socket close failure error code %d\n", event.value);
printf ("socket address %s\n", event.data.close_failed.addr); printf ("socket address %s\n", addr);
break; break;
case ZMQ_EVENT_CLOSED: case ZMQ_EVENT_CLOSED:
printf ("closed socket descriptor %d\n", event.data.closed.fd); printf ("closed socket descriptor %d\n", event.value);
printf ("closed socket address %s\n", event.data.closed.addr); printf ("closed socket address %s\n", addr);
break; break;
case ZMQ_EVENT_DISCONNECTED: case ZMQ_EVENT_DISCONNECTED:
printf ("disconnected socket descriptor %d\n", event.data.disconnected.fd); printf ("disconnected socket descriptor %d\n", event.value);
printf ("disconnected socket address %s\n", event.data.disconnected.addr); printf ("disconnected socket address %s\n", addr);
break; break;
} }
zmq_msg_close (&msg);
} }
zmq_close (s); zmq_close (s);
return NULL; return NULL;
} }
// Create the infrastructure int main()
void *ctx = zmq_init (1); {
assert (ctx); const char* addr = "tcp://127.0.0.1:6666" ;
pthread_t thread ;
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
// REP socket
void* rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// REP socket // REP socket monitor, all events
rep = zmq_socket (ctx, ZMQ_REP); int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rep); assert (rc == 0);
rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);
assert (rc == 0);
// REP socket monitor, all events rc = zmq_bind (rep, addr);
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); assert (rc == 0);
assert (rc == 0);
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
assert (rc == 0);
rc = zmq_bind (rep, addr); // Allow some time for event detection
assert (rc == 0); zmq_sleep (1);
// Allow some time for event detection // Close the REP socket
zmq_sleep (1); rc = zmq_close (rep);
assert (rc == 0);
// Close the REP socket zmq_term (ctx);
rc = zmq_close (rep);
assert (rc == 0);
zmq_term (ctx); return 0 ;
}
---- ----
...@@ -286,3 +256,4 @@ linkzmq:zmq[7] ...@@ -286,3 +256,4 @@ linkzmq:zmq[7]
AUTHORS AUTHORS
------- -------
This 0MQ manual page was written by Lourens Naudé <lourens@methodmissing.com> This 0MQ manual page was written by Lourens Naudé <lourens@methodmissing.com>
Changes by Guido Goldstein <github@a-nugget.de>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment