Commit 2d6f5b0e authored by Ian Barber's avatar Ian Barber

Merge pull request #985 from hintjens/master

Cleaned up socket monitor code, tests, and man page
parents 50afebe4 9753de85
...@@ -102,6 +102,7 @@ Access message content:: ...@@ -102,6 +102,7 @@ Access message content::
linkzmq:zmq_msg_more[3] linkzmq:zmq_msg_more[3]
Work with message properties:: Work with message properties::
linkzmq:zmq_msg_gets[3]
linkzmq:zmq_msg_get[3] linkzmq:zmq_msg_get[3]
linkzmq:zmq_msg_set[3] linkzmq:zmq_msg_set[3]
......
zmq_ctx_socket_monitor(3) zmq_socket_monitor(3)
========================= =====================
NAME NAME
---- ----
zmq_socket_monitor - register a monitoring callback zmq_socket_monitor - monitor socket events
SYNOPSIS SYNOPSIS
-------- --------
*int zmq_socket_monitor (void '*socket', char * '*addr', int 'events');* *int zmq_socket_monitor (void '*socket', char '*endpoint', int 'events');*
DESCRIPTION DESCRIPTION
----------- -----------
The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes The _zmq_socket_monitor()_ method lets an application thread track
socket state changes (events) over the inproc:// transport to a given endpoint. socket events (like connects) on a ZeroMQ socket. Each call to this
method creates a 'ZMQ_PAIR' socket and binds that to the specified
inproc:// 'endpoint'. To collect the socket events, you must create
your own 'ZMQ_PAIR' socket, and connect that to the endpoint.
Messages consist of 2 Frames, the first containing the event-id and the The 'events' argument is a bitmask of the socket events you wish to
associated value. The second frame holds the affected endpoint as string. monitor, see 'Supported events' below. To monitor all events, use the
event value ZMQ_EVENT_ALL.
The layout of the first Frame is: Each event is sent as two frames. The first frame contains an event
16 bit event id number (16 bits), and an event value (32 bits) that provides additional
32 bit event value data according to the event number. The second frame contains a string
that specifies the affected TCP or IPC endpoint.
event id and value are in the native byte order (for the machine the
application is running on). There is no padding between the fields.
The event value has to be interpreted in the context of the event id.
See 'Supported events' below for details.
---- ----
The _zmq_socket_monitor()_ method supports only connection-oriented
Only connection oriented (tcp and ipc) transports are supported in this initial transports, that is, TCP, IPC, and TIPC.
implementation.
---- ----
Supported events Supported events
---------------- ----------------
ZMQ_EVENT_CONNECTED: connection established ZMQ_EVENT_CONNECTED
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established The socket has successfully connected to a remote peer. The event value
to a remote peer. This can happen either synchronous or asynchronous. is the file descriptor (FD) of the underlying network socket. Warning:
Value is the FD of the newly connected socket. there is no guarantee that the FD is still valid by the time your code
receives this event.
ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled ZMQ_EVENT_CONNECT_DELAYED
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection A connect request on the socket is pending. The event value is unspecified.
attempt is delayed and its completion is being polled for.
Value has no meaning. ZMQ_EVENT_CONNECT_RETRIED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request failed, and is now being retried. The event value is the
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt reconnect interval in milliseconds. Note that the reconnect interval is
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ recalculated at each retry.
The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt
is being handled by reconnect timer. The reconnect interval's recomputed ZMQ_EVENT_LISTENING
for each attempt. ~~~~~~~~~~~~~~~~~~~
Value is the reconnect interval. The socket was successfully bound to a network interface. The event value
is the FD of the underlying network socket. Warning: there is no guarantee
that the FD is still valid by the time your code receives this event.
ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ZMQ_EVENT_BIND_FAILED
The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound ~~~~~~~~~~~~~~~~~~~~~
to a an interface. The socket could not bind to a given interface. The event value is the
Value is the FD of the newly bound socket. errno generated by the system bind call.
ZMQ_EVENT_ACCEPTED
ZMQ_EVENT_BIND_FAILED: socket could not bind to an address ~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The socket has accepted a connection from a remote peer. The event value is
The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to the FD of the underlying network socket. Warning: there is no guarantee that
a given interface. the FD is still valid by the time your code receives this event.
Value is the errno generated by the bind call.
ZMQ_EVENT_ACCEPT_FAILED
~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface The socket has rejected a connection from a remote peer. The event value is
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ the errno generated by the accept call.
The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer
has been established with a socket's listen address. ZMQ_EVENT_CLOSED
Value is the FD of the accepted socket. ~~~~~~~~~~~~~~~~
The socket was closed. The event value is the FD of the (now closed) network
socket.
ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ZMQ_EVENT_CLOSE_FAILED
The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to ~~~~~~~~~~~~~~~~~~~~~~
a socket's bound address fails. The socket close failed. The event value is the errno returned by the system
Value is the errno generated by accept. call. Note that this event occurs only on IPC transports.
ZMQ_EVENT_DISCONNECTED
ZMQ_EVENT_CLOSED: connection closed ~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The socket was disconnected unexpectedly. The event value is the FD of the
The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor underlying network socket. Warning: this socket will be closed.
has been closed.
Value is the former FD of the for the closed socket. FD has been closed already! ZMQ_EVENT_MONITOR_STOPPED
~~~~~~~~~~~~~~~~~~~~~~~~~
Monitoring on this socket ended.
ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be
released back to the OS. Implementation note: ONLY FOR IPC SOCKETS.
Value is the errno generated by unlink.
ZMQ_EVENT_DISCONNECTED: broken session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc
specific) detects a corrupted / broken session.
Value is the FD of the socket.
RETURN VALUE RETURN VALUE
...@@ -133,115 +118,110 @@ The endpoint supplied is invalid. ...@@ -133,115 +118,110 @@ The endpoint supplied is invalid.
EXAMPLE EXAMPLE
------- -------
.Observing a 'REP' socket's connection state .Monitoring client and server sockets
---- ----
#include <stdio.h> // Read one event off the monitor socket; return value and address
#include <zmq.h> // by reference, if not null, and event number by value. Returns -1
#include <pthread.h> // in case of error.
#include <string.h>
#include <assert.h>
static int read_msg(void* s, zmq_event_t* event, char* ep) static int
get_monitor_event (void *monitor, int *value, char **address)
{ {
int rc ; // First frame in message contains event number and value
zmq_msg_t msg1; // binary part zmq_msg_t msg;
zmq_msg_init (&msg1); zmq_msg_init (&msg);
zmq_msg_t msg2; // address part if (zmq_msg_recv (&msg, monitor, 0) == -1)
zmq_msg_init (&msg2); return -1; // Interruped, presumably
rc = zmq_msg_recv (&msg1, s, 0); assert (zmq_msg_more (&msg));
if (rc == -1 && zmq_errno() == ETERM)
return 1 ; uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
assert (rc != -1); uint16_t event = *(uint16_t *) (data);
assert (zmq_msg_more(&msg1) != 0); if (value)
rc = zmq_msg_recv (&msg2, s, 0); *value = *(uint32_t *) (data + 2);
if (rc == -1 && zmq_errno() == ETERM)
return 1; // Second frame in message contains event address
assert (rc != -1); zmq_msg_init (&msg);
assert (zmq_msg_more(&msg2) == 0); if (zmq_msg_recv (&msg, monitor, 0) == -1)
// copy binary data to event struct return -1; // Interruped, presumably
const char* data = (char*)zmq_msg_data(&msg1); assert (!zmq_msg_more (&msg));
memcpy(&(event->event), data, sizeof(event->event));
memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value)); if (address) {
// copy address part uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
const size_t len = zmq_msg_size(&msg2) ; size_t size = zmq_msg_size (&msg);
ep = memcpy(ep, zmq_msg_data(&msg2), len); *address = (char *) malloc (size + 1);
*(ep + len) = 0 ; memcpy (*address, data, size);
return 0 ; *address [size] = 0;
}
return event;
} }
// REP socket monitor thread int main (void)
static void *rep_socket_monitor (void *ctx)
{ {
zmq_event_t event; void *ctx = zmq_ctx_new ();
static char addr[1025] ; assert (ctx);
int rc;
printf("starting monitor...\n"); // We'll monitor these two sockets
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
void *s = zmq_socket (ctx, ZMQ_PAIR); // Socket monitoring only works over inproc://
assert (s); int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
assert (rc == -1);
assert (zmq_errno () == EPROTONOSUPPORT);
rc = zmq_connect (s, "inproc://monitor.rep"); // Monitor all events on client and server sockets
rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
while (!read_msg(s, &event, addr)) {
switch (event.event) {
case ZMQ_EVENT_LISTENING:
printf ("listening socket descriptor %d\n", event.value);
printf ("listening socket address %s\n", addr);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("accepted socket descriptor %d\n", event.value);
printf ("accepted socket address %s\n", addr);
break;
case ZMQ_EVENT_CLOSE_FAILED:
printf ("socket close failure error code %d\n", event.value);
printf ("socket address %s\n", addr);
break;
case ZMQ_EVENT_CLOSED:
printf ("closed socket descriptor %d\n", event.value);
printf ("closed socket address %s\n", addr);
break;
case ZMQ_EVENT_DISCONNECTED:
printf ("disconnected socket descriptor %d\n", event.value);
printf ("disconnected socket address %s\n", addr);
break;
}
}
zmq_close (s);
return NULL;
}
int main()
{
const char* addr = "tcp://127.0.0.1:6666" ;
pthread_t thread ;
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
// REP socket // Create two sockets for collecting monitor events
void* rep = zmq_socket (ctx, ZMQ_REP); void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (rep); assert (client_mon);
void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (server_mon);
// REP socket monitor, all events // Connect these to the inproc endpoints so they'll get events
int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); rc = zmq_connect (client_mon, "inproc://monitor-client");
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx); rc = zmq_connect (server_mon, "inproc://monitor-server");
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (rep, addr); // Now do a basic ping test
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
// Allow some time for event detection
zmq_sleep (1);
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0); assert (rc == 0);
bounce (client, server);
zmq_term (ctx);
// Close client and server
close_zero_linger (client);
close_zero_linger (server);
// Now collect and check events from both sockets
int event = get_monitor_event (client_mon, NULL, NULL);
if (event == ZMQ_EVENT_CONNECT_DELAYED)
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// This is the flow of server events
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_LISTENING);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_ACCEPTED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CLOSED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// Close down the sockets
close_zero_linger (client_mon);
close_zero_linger (server_mon);
zmq_ctx_term (ctx);
return 0 ; return 0 ;
} }
......
...@@ -326,34 +326,20 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property); ...@@ -326,34 +326,20 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property);
/* 0MQ socket events and monitoring */ /* 0MQ socket events and monitoring */
/******************************************************************************/ /******************************************************************************/
/* Socket transport events (tcp and ipc only) */ /* Socket transport events (TCP and IPC only) */
#define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2 #define ZMQ_EVENT_CONNECTED 0x0001
#define ZMQ_EVENT_CONNECT_RETRIED 4 #define ZMQ_EVENT_CONNECT_DELAYED 0x0002
#define ZMQ_EVENT_CONNECT_RETRIED 0x0004
#define ZMQ_EVENT_LISTENING 8 #define ZMQ_EVENT_LISTENING 0x0008
#define ZMQ_EVENT_BIND_FAILED 16 #define ZMQ_EVENT_BIND_FAILED 0x0010
#define ZMQ_EVENT_ACCEPTED 0x0020
#define ZMQ_EVENT_ACCEPTED 32 #define ZMQ_EVENT_ACCEPT_FAILED 0x0040
#define ZMQ_EVENT_ACCEPT_FAILED 64 #define ZMQ_EVENT_CLOSED 0x0080
#define ZMQ_EVENT_CLOSE_FAILED 0x0100
#define ZMQ_EVENT_CLOSED 128 #define ZMQ_EVENT_DISCONNECTED 0x0200
#define ZMQ_EVENT_CLOSE_FAILED 256 #define ZMQ_EVENT_MONITOR_STOPPED 0x0400
#define ZMQ_EVENT_DISCONNECTED 512 #define ZMQ_EVENT_ALL 0xFFFF
#define ZMQ_EVENT_MONITOR_STOPPED 1024
#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \
ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \
ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \
ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \
ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED | \
ZMQ_EVENT_MONITOR_STOPPED)
/* Socket event data */
typedef struct {
uint16_t event; // id of the event as bitfield
int32_t value ; // value is either error code, fd or reconnect interval
} zmq_event_t;
ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_close (void *s);
......
...@@ -1169,22 +1169,19 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) ...@@ -1169,22 +1169,19 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
int zmq::socket_base_t::monitor (const char *addr_, int events_) int zmq::socket_base_t::monitor (const char *addr_, int events_)
{ {
int rc;
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
// Support deregistering monitoring endpoints as well // Support deregistering monitoring endpoints as well
if (addr_ == NULL) { if (addr_ == NULL) {
stop_monitor (); stop_monitor ();
return 0; return 0;
} }
// Parse addr_ string. // Parse addr_ string.
std::string protocol; std::string protocol;
std::string address; std::string address;
rc = parse_uri (addr_, protocol, address); int rc = parse_uri (addr_, protocol, address);
if (rc != 0) if (rc != 0)
return -1; return -1;
...@@ -1197,7 +1194,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1197,7 +1194,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT;
return -1; return -1;
} }
// Register events to monitor // Register events to monitor
monitor_events = events_; monitor_events = events_;
monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR); monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
...@@ -1229,134 +1225,88 @@ zmq::fd_t zmq::socket_base_t::fd() ...@@ -1229,134 +1225,88 @@ zmq::fd_t zmq::socket_base_t::fd()
void zmq::socket_base_t::event_connected (std::string &addr_, int fd_) void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) { if (monitor_events & ZMQ_EVENT_CONNECTED)
zmq_event_t event; monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
event.event = ZMQ_EVENT_CONNECTED;
event.value = fd_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_) void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) { if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
zmq_event_t event; monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.value = err_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_) void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) { if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
zmq_event_t event; monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.value = interval_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_listening (std::string &addr_, int fd_) void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_LISTENING) { if (monitor_events & ZMQ_EVENT_LISTENING)
zmq_event_t event; monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
event.event = ZMQ_EVENT_LISTENING;
event.value = fd_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_) void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_BIND_FAILED) { if (monitor_events & ZMQ_EVENT_BIND_FAILED)
zmq_event_t event; monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
event.event = ZMQ_EVENT_BIND_FAILED;
event.value = err_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_) void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPTED) { if (monitor_events & ZMQ_EVENT_ACCEPTED)
zmq_event_t event; monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
event.event = ZMQ_EVENT_ACCEPTED;
event.value = fd_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_) void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) { if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
zmq_event_t event; monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.value= err_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_closed (std::string &addr_, int fd_) void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSED) { if (monitor_events & ZMQ_EVENT_CLOSED)
zmq_event_t event; monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
event.event = ZMQ_EVENT_CLOSED;
event.value = fd_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_) void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) { if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
zmq_event_t event; monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
event.event = ZMQ_EVENT_CLOSE_FAILED;
event.value = err_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_) void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_DISCONNECTED) { if (monitor_events & ZMQ_EVENT_DISCONNECTED)
zmq_event_t event; monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
event.event = ZMQ_EVENT_DISCONNECTED;
event.value = fd_;
monitor_event (event, addr_);
}
} }
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_) // Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
{ {
if (monitor_socket) { if (monitor_socket) {
const uint16_t eid = (uint16_t)event_.event; // Send event in first frame
const uint32_t value = (uint32_t)event_.value;
// prepare and send first message frame
// containing event id and value
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value)); zmq_msg_init_size (&msg, 6);
char* data1 = (char*)zmq_msg_data(&msg); uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
memcpy (data1, &eid, sizeof(eid)); *(uint16_t *) (data + 0) = (uint16_t) event_;
memcpy (data1+sizeof(eid), &value, sizeof(value)); *(uint32_t *) (data + 2) = (uint32_t) value_;
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
// prepare and send second message frame
// containing the address (endpoint) // Send address in second frame
zmq_msg_init_size (&msg, addr_.size()); zmq_msg_init_size (&msg, addr_.size());
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
zmq_sendmsg (monitor_socket, &msg, 0); zmq_sendmsg (monitor_socket, &msg, 0);
} }
} }
void zmq::socket_base_t::stop_monitor() void zmq::socket_base_t::stop_monitor (void)
{ {
if (monitor_socket) { if (monitor_socket) {
if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED) { if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
zmq_event_t event; monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
event.event = ZMQ_EVENT_MONITOR_STOPPED;
event.value = 0;
monitor_event (event, "");
}
zmq_close (monitor_socket); zmq_close (monitor_socket);
monitor_socket = NULL; monitor_socket = NULL;
monitor_events = 0; monitor_events = 0;
......
...@@ -160,7 +160,7 @@ namespace zmq ...@@ -160,7 +160,7 @@ namespace zmq
void process_destroy (); void process_destroy ();
// Socket event data dispath // Socket event data dispath
void monitor_event (zmq_event_t data_, const std::string& addr_); void monitor_event (int event_, int value_, const std::string& addr_);
// Monitor socket cleanup // Monitor socket cleanup
void stop_monitor (); void stop_monitor ();
......
...@@ -19,254 +19,110 @@ ...@@ -19,254 +19,110 @@
#include "testutil.hpp" #include "testutil.hpp"
// REQ socket events handled // Read one event off the monitor socket; return value and address
static int req_socket_events; // by reference, if not null, and event number by value. Returns -1
// 2nd REQ socket events handled // in case of error.
static int req2_socket_events;
// REP socket events handled
static int rep_socket_events;
std::string addr ; static int
get_monitor_event (void *monitor, int *value, char **address)
static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
{ {
int rc ; // First frame in message contains event number and value
zmq_msg_t msg1; // binary part zmq_msg_t msg;
zmq_msg_init (&msg1); zmq_msg_init (&msg);
zmq_msg_t msg2; // address part if (zmq_msg_recv (&msg, monitor, 0) == -1)
zmq_msg_init (&msg2); return -1; // Interruped, presumably
rc = zmq_msg_recv (&msg1, s, 0); assert (zmq_msg_more (&msg));
if (rc == -1 && zmq_errno() == ETERM)
return true ; uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
assert (rc != -1); if (value)
assert (zmq_msg_more(&msg1) != 0); *value = *(uint32_t *) (data + 2);
rc = zmq_msg_recv (&msg2, s, 0);
if (rc == -1 && zmq_errno() == ETERM) // Second frame in message contains event address
return true; zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
assert (rc != -1); return -1; // Interruped, presumably
assert (zmq_msg_more(&msg2) == 0); assert (!zmq_msg_more (&msg));
// copy binary data to event struct
const char* data = (char*)zmq_msg_data(&msg1); if (address) {
memcpy(&event.event, data, sizeof(event.event)); uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
memcpy(&event.value, data+sizeof(event.event), sizeof(event.value)); size_t size = zmq_msg_size (&msg);
// copy address part *address = (char *) malloc (size + 1);
ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2)); memcpy (*address, data, size);
*address [size] = 0;
if (event.event == ZMQ_EVENT_MONITOR_STOPPED)
return true;
return false;
}
// REQ socket monitor thread
static void req_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
assert (event.value > 0);
req_socket_events |= ZMQ_EVENT_CONNECTED;
req2_socket_events |= ZMQ_EVENT_CONNECTED;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CONNECT_DELAYED;
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CLOSE_FAILED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CLOSED;
break;
case ZMQ_EVENT_DISCONNECTED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_DISCONNECTED;
break;
}
} }
zmq_close (s); return event;
}
// 2nd REQ socket monitor thread
static void req2_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req2");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
assert (event.value > 0);
req2_socket_events |= ZMQ_EVENT_CONNECTED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
req2_socket_events |= ZMQ_EVENT_CLOSED;
break;
}
}
zmq_close (s);
}
// REP socket monitor thread
static void rep_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_LISTENING:
assert (event.value > 0);
rep_socket_events |= ZMQ_EVENT_LISTENING;
break;
case ZMQ_EVENT_ACCEPTED:
assert (event.value > 0);
rep_socket_events |= ZMQ_EVENT_ACCEPTED;
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_CLOSE_FAILED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_CLOSED;
break;
case ZMQ_EVENT_DISCONNECTED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_DISCONNECTED;
break;
}
}
zmq_close (s);
} }
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment();
int rc;
void *req;
void *req2;
void *rep;
void* threads [3];
addr = "tcp://127.0.0.1:5560";
// Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
// REP socket // We'll monitor these two sockets
rep = zmq_socket (ctx, ZMQ_REP); void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (rep); assert (client);
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
// Assert supported protocols // Socket monitoring only works over inproc://
rc = zmq_socket_monitor (rep, addr.c_str(), 0); int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
assert (rc == -1); assert (rc == -1);
assert (zmq_errno() == EPROTONOSUPPORT); assert (zmq_errno () == EPROTONOSUPPORT);
// Deregister monitor
rc = zmq_socket_monitor (rep, NULL, 0);
assert (rc == 0);
// REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rc == 0);
threads [0] = zmq_threadstart(&rep_socket_monitor, ctx);
// REQ socket
req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
// REQ socket monitor, all events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0);
threads [1] = zmq_threadstart(&req_socket_monitor, ctx);
msleep (SETTLE_TIME);
// Bind REQ and REP // Monitor all events on client and server sockets
rc = zmq_bind (rep, addr.c_str()); rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
rc = zmq_connect (req, addr.c_str());
assert (rc == 0); assert (rc == 0);
bounce (rep, req); // Create two sockets for collecting monitor events
void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
// 2nd REQ socket assert (client_mon);
req2 = zmq_socket (ctx, ZMQ_REQ); void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (req2); assert (server_mon);
// 2nd REQ socket monitor, connected event only
rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED);
assert (rc == 0);
threads [2] = zmq_threadstart(&req2_socket_monitor, ctx);
rc = zmq_connect (req2, addr.c_str()); // Connect these to the inproc endpoints so they'll get events
rc = zmq_connect (client_mon, "inproc://monitor-client");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (server_mon, "inproc://monitor-server");
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0); assert (rc == 0);
// Allow enough time for detecting error states // Now do a basic ping test
msleep (250); rc = zmq_bind (server, "tcp://127.0.0.1:9998");
// Close the REQ socket
rc = zmq_close (req);
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
// Close the 2nd REQ socket
rc = zmq_close (req2);
assert (rc == 0); assert (rc == 0);
bounce (client, server);
// Close client and server
close_zero_linger (client);
close_zero_linger (server);
// Now collect and check events from both sockets
int event = get_monitor_event (client_mon, NULL, NULL);
if (event == ZMQ_EVENT_CONNECT_DELAYED)
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// This is the flow of server events
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_LISTENING);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_ACCEPTED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CLOSED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// Close down the sockets
close_zero_linger (client_mon);
close_zero_linger (server_mon);
zmq_ctx_term (ctx); zmq_ctx_term (ctx);
// Expected REP socket events
assert (rep_socket_events & ZMQ_EVENT_LISTENING);
assert (rep_socket_events & ZMQ_EVENT_ACCEPTED);
assert (rep_socket_events & ZMQ_EVENT_CLOSED);
// Expected REQ socket events
assert (req_socket_events & ZMQ_EVENT_CONNECTED);
assert (req_socket_events & ZMQ_EVENT_DISCONNECTED);
assert (req_socket_events & ZMQ_EVENT_CLOSED);
// Expected 2nd REQ socket events
assert (req2_socket_events & ZMQ_EVENT_CONNECTED);
assert (!(req2_socket_events & ZMQ_EVENT_CLOSED));
for (unsigned int i = 0; i < 3; ++i)
zmq_threadclose(threads [i]);
return 0 ; return 0 ;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment