Commit 4767159f authored by Lourens Naudé's avatar Lourens Naudé

Initial stab at a context level monitor callback and registration API

parent e13b3723
...@@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_MONITOR 39
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
...@@ -243,17 +242,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -243,17 +242,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
/* Socket transport events (tcp and ipc only) */ /* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1 #define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2 #define ZMQ_EVENT_CONNECT_DELAYED 2
#define ZMQ_EVENT_CONNECT_RETRIED 3 #define ZMQ_EVENT_CONNECT_RETRIED 4
#define ZMQ_EVENT_LISTENING 4 #define ZMQ_EVENT_LISTENING 8
#define ZMQ_EVENT_BIND_FAILED 5 #define ZMQ_EVENT_BIND_FAILED 16
#define ZMQ_EVENT_ACCEPTED 6 #define ZMQ_EVENT_ACCEPTED 32
#define ZMQ_EVENT_ACCEPT_FAILED 7 #define ZMQ_EVENT_ACCEPT_FAILED 64
#define ZMQ_EVENT_CLOSED 8 #define ZMQ_EVENT_CLOSED 128
#define ZMQ_EVENT_CLOSE_FAILED 9 #define ZMQ_EVENT_CLOSE_FAILED 256
#define ZMQ_EVENT_DISCONNECTED 10 #define ZMQ_EVENT_DISCONNECTED 512
/* Socket event data (union member per event) */ /* Socket event data (union member per event) */
typedef union { typedef union {
...@@ -300,10 +299,9 @@ typedef union { ...@@ -300,10 +299,9 @@ typedef union {
} zmq_event_data_t; } zmq_event_data_t;
/* Callback template for socket state changes */ /* Callback template for socket state changes */
typedef union { typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
void *object;
void (*function)(void *s, int event, zmq_event_data_t *data); ZMQ_EXPORT int zmq_monitor (void *context, zmq_monitor_fn *monitor);
} zmq_monitor;
ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_close (void *s);
......
...@@ -45,7 +45,8 @@ zmq::ctx_t::ctx_t () : ...@@ -45,7 +45,8 @@ zmq::ctx_t::ctx_t () :
slot_count (0), slot_count (0),
slots (NULL), slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT), max_sockets (ZMQ_MAX_SOCKETS_DFLT),
io_thread_count (ZMQ_IO_THREADS_DFLT) io_thread_count (ZMQ_IO_THREADS_DFLT),
monitor_fn (NULL)
{ {
} }
...@@ -125,6 +126,12 @@ int zmq::ctx_t::terminate () ...@@ -125,6 +126,12 @@ int zmq::ctx_t::terminate ()
return 0; return 0;
} }
int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_)
{
monitor_fn = monitor_;
return 0;
}
int zmq::ctx_t::set (int option_, int optval_) int zmq::ctx_t::set (int option_, int optval_)
{ {
int rc = 0; int rc = 0;
...@@ -346,6 +353,62 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -346,6 +353,62 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint; return endpoint;
} }
void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...)
{
if (monitor_fn != NULL) {
va_list args;
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
va_start (args, event_);
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args, char*);
data.connected.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args, char*);
data.connect_delayed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args, char*);
data.connect_retried.interval = va_arg (args, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args, char*);
data.listening.fd = va_arg (args, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args, char*);
data.bind_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args, char*);
data.accepted.fd = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args, char*);
data.accept_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args, char*);
data.closed.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args, char*);
data.close_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args, char*);
data.disconnected.fd = va_arg (args, int);
break;
default:
zmq_assert (false);
}
monitor_fn ((void *)socket_, event_, &data);
va_end (args);
}
}
// The last used socket ID, or 0 if no socket was used so far. Note that this // The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have // is a global variable. Thus, even sockets created in different contexts have
// unique IDs. // unique IDs.
......
...@@ -95,12 +95,17 @@ namespace zmq ...@@ -95,12 +95,17 @@ namespace zmq
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
// Monitoring specific
int monitor (zmq_monitor_fn *monitor_);
void monitor_event (zmq::socket_base_t *socket_, int event_, ...);
enum { enum {
term_tid = 0, term_tid = 0,
reaper_tid = 1 reaper_tid = 1
}; };
~ctx_t (); ~ctx_t ();
private: private:
...@@ -163,6 +168,9 @@ namespace zmq ...@@ -163,6 +168,9 @@ namespace zmq
// Synchronisation of access to context options. // Synchronisation of access to context options.
mutex_t opt_sync; mutex_t opt_sync;
// Monitoring callback
zmq_monitor_fn *monitor_fn;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&); const ctx_t &operator = (const ctx_t&);
}; };
......
...@@ -53,7 +53,6 @@ zmq::options_t::options_t () : ...@@ -53,7 +53,6 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1), tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1), tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1), tcp_keepalive_intvl (-1),
monitor (NULL),
socket_id (0) socket_id (0)
{ {
} }
...@@ -314,20 +313,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -314,20 +313,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0; return 0;
} }
} }
case ZMQ_MONITOR:
{
if (optvallen_ == 0 && optval_ == NULL) {
monitor = NULL;
return 0;
}
if (optvallen_ != sizeof (void *)) {
errno = EINVAL;
return -1;
}
monitor = *((zmq_monitor**) &optval_);
return 0;
}
} }
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -544,15 +529,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -544,15 +529,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1); memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1);
*optvallen_ = last_endpoint.size()+1; *optvallen_ = last_endpoint.size()+1;
return 0; return 0;
case ZMQ_MONITOR:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
*((zmq_monitor**) &optval_) = monitor;
*optvallen_ = sizeof (zmq_monitor*);
return 0;
} }
errno = EINVAL; errno = EINVAL;
......
...@@ -125,9 +125,6 @@ namespace zmq ...@@ -125,9 +125,6 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t; typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters; tcp_accept_filters_t tcp_accept_filters;
// Connection and exceptional state callback
zmq_monitor *monitor;
// ID of the socket. // ID of the socket.
int socket_id; int socket_id;
}; };
......
...@@ -981,56 +981,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) ...@@ -981,56 +981,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
void zmq::socket_base_t::monitor_event (int event_, ...) void zmq::socket_base_t::monitor_event (int event_, ...)
{ {
if (options.monitor != NULL) { va_list args;
va_list args; va_start (args, event_);
zmq_event_data_t data; get_ctx ()->monitor_event (this, event_, args);
memset(&data, 0, sizeof (zmq_event_data_t)); va_end (args);
va_start (args, event_);
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args, char*);
data.connected.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args, char*);
data.connect_delayed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args, char*);
data.connect_retried.interval = va_arg (args, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args, char*);
data.listening.fd = va_arg (args, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args, char*);
data.bind_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args, char*);
data.accepted.fd = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args, char*);
data.accept_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args, char*);
data.closed.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args, char*);
data.close_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args, char*);
data.disconnected.fd = va_arg (args, int);
break;
default:
zmq_assert (false);
}
options.monitor->function ((void *)this, event_, &data);
va_end (args);
}
} }
...@@ -205,6 +205,14 @@ int zmq_ctx_get (void *ctx_, int option_) ...@@ -205,6 +205,14 @@ int zmq_ctx_get (void *ctx_, int option_)
return ((zmq::ctx_t*) ctx_)->get (option_); return ((zmq::ctx_t*) ctx_)->get (option_);
} }
int zmq_monitor (void *ctx_, zmq_monitor_fn *monitor_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->monitor (monitor_);
}
// Stable/legacy context API // Stable/legacy context API
......
...@@ -25,61 +25,50 @@ ...@@ -25,61 +25,50 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h" #include "../include/zmq_utils.h"
void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_) static int events;
void socket_monitor (void *s, int event_, zmq_event_data_t *data_)
{ {
const char *addr = "tcp://127.0.0.1:5560"; const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire // Only some of the exceptional events could fire
switch (event_) { switch (event_) {
// listener specific
case ZMQ_EVENT_LISTENING: case ZMQ_EVENT_LISTENING:
assert (data_->listening.fd > 0); assert (data_->listening.fd > 0);
assert (memcmp (data_->listening.addr, addr, 22)); assert (memcmp (data_->listening.addr, addr, 22));
events |= ZMQ_EVENT_LISTENING;
break; break;
case ZMQ_EVENT_ACCEPTED: case ZMQ_EVENT_ACCEPTED:
assert (data_->accepted.fd > 0); assert (data_->accepted.fd > 0);
assert (memcmp (data_->accepted.addr, addr, 22)); assert (memcmp (data_->accepted.addr, addr, 22));
events |= ZMQ_EVENT_ACCEPTED;
break; break;
case ZMQ_EVENT_CLOSE_FAILED: // connecter specific
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
assert (data_->connected.fd > 0); assert (data_->connected.fd > 0);
assert (memcmp (data_->connected.addr, addr, 22)); assert (memcmp (data_->connected.addr, addr, 22));
events |= ZMQ_EVENT_CONNECTED;
break; break;
case ZMQ_EVENT_CONNECT_DELAYED: case ZMQ_EVENT_CONNECT_DELAYED:
assert (data_->connect_delayed.err != 0); assert (data_->connect_delayed.err != 0);
assert (memcmp (data_->connect_delayed.addr, addr, 22)); assert (memcmp (data_->connect_delayed.addr, addr, 22));
events |= ZMQ_EVENT_CONNECT_DELAYED;
break; break;
// generic - either end of the socket
case ZMQ_EVENT_CLOSE_FAILED: case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0); assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22)); assert (memcmp (data_->close_failed.addr, addr, 22));
events |= ZMQ_EVENT_CLOSE_FAILED;
break; break;
case ZMQ_EVENT_CLOSED: case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0); assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22)); assert (memcmp (data_->closed.addr, addr, 22));
events |= ZMQ_EVENT_CLOSED;
break; break;
case ZMQ_EVENT_DISCONNECTED: case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0); assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22)); assert (memcmp (data_->disconnected.addr, addr, 22));
events |= ZMQ_EVENT_DISCONNECTED;
break; break;
default: default:
// out of band / unexpected event // out of band / unexpected event
...@@ -94,44 +83,18 @@ int main (int argc, char *argv []) ...@@ -94,44 +83,18 @@ int main (int argc, char *argv [])
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_init (1); void *ctx = zmq_init (1);
assert (ctx); assert (ctx);
// set socket monitor
rc = zmq_monitor (ctx, socket_monitor);
assert (rc == 0);
void *rep = zmq_socket (ctx, ZMQ_REP); void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep); assert (rep);
// Expects failure - invalid size
zmq_monitor monitor;
monitor.function = listening_sock_monitor;
rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, 20);
assert (rc == -1);
assert (errno == EINVAL);
rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, sizeof (void *));
assert (rc == 0);
size_t sz = sizeof (void *);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz);
assert (rc == 0);
assert (monitor.function == listening_sock_monitor);
// Remove socket monitor callback
rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0);
assert (rc == 0);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz);
assert (rc == 0);
assert (monitor.function == listening_sock_monitor);
rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); rc = zmq_bind (rep, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
void *req = zmq_socket (ctx, ZMQ_REQ); void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req); assert (req);
monitor.function = connecting_sock_monitor;
rc = zmq_setsockopt (req, ZMQ_MONITOR, &monitor, sizeof (void *));
assert (rc == 0);
rc = zmq_connect (req, "tcp://127.0.0.1:5560"); rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
...@@ -151,5 +114,12 @@ int main (int argc, char *argv []) ...@@ -151,5 +114,12 @@ int main (int argc, char *argv [])
zmq_sleep (1); zmq_sleep (1);
zmq_term (ctx); zmq_term (ctx);
// We expect to at least observe these events
assert (events & ZMQ_EVENT_LISTENING);
assert (events & ZMQ_EVENT_ACCEPTED);
assert (events & ZMQ_EVENT_CONNECTED);
assert (events & ZMQ_EVENT_CLOSED);
return 0 ; return 0 ;
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment