Commit 759d4533 authored by Lourens Naudé's avatar Lourens Naudé

Significantly reworked the monitoring infrastructure with a more granular per…

Significantly reworked the monitoring infrastructure with a more granular per socket API and to play well with monitoring endpoints in application threads
parent 7a40df6d
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\ zmq_init.3 zmq_term.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \ zmq_msg_send.3 zmq_msg_recv.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_socket_monitor.3 zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
......
...@@ -278,8 +278,16 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -278,8 +278,16 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_EVENT_CLOSE_FAILED 256 #define ZMQ_EVENT_CLOSE_FAILED 256
#define ZMQ_EVENT_DISCONNECTED 512 #define ZMQ_EVENT_DISCONNECTED 512
#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \
ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \
ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \
ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \
ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED )
/* Socket event data (union member per event) */ /* Socket event data (union member per event) */
typedef union { typedef struct {
int event;
union {
struct { struct {
char *addr; char *addr;
int fd; int fd;
...@@ -320,12 +328,8 @@ typedef union { ...@@ -320,12 +328,8 @@ typedef union {
char *addr; char *addr;
int fd; int fd;
} disconnected; } disconnected;
} zmq_event_data_t; } data;
} zmq_event_t;
/* Callback template for socket state changes */
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
ZMQ_EXPORT int zmq_ctx_set_monitor (void *context, zmq_monitor_fn *monitor);
ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_close (void *s);
...@@ -339,6 +343,7 @@ ZMQ_EXPORT int zmq_unbind (void *s, const char *addr); ...@@ -339,6 +343,7 @@ ZMQ_EXPORT int zmq_unbind (void *s, const char *addr);
ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr); ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr);
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
......
...@@ -45,8 +45,7 @@ zmq::ctx_t::ctx_t () : ...@@ -45,8 +45,7 @@ zmq::ctx_t::ctx_t () :
slot_count (0), slot_count (0),
slots (NULL), slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT), max_sockets (ZMQ_MAX_SOCKETS_DFLT),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT)
monitor_fn (NULL)
{ {
} }
...@@ -126,12 +125,6 @@ int zmq::ctx_t::terminate () ...@@ -126,12 +125,6 @@ int zmq::ctx_t::terminate ()
return 0; return 0;
} }
int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_)
{
monitor_fn = monitor_;
return 0;
}
int zmq::ctx_t::set (int option_, int optval_) int zmq::ctx_t::set (int option_, int optval_)
{ {
int rc = 0; int rc = 0;
...@@ -353,67 +346,6 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -353,67 +346,6 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint; return endpoint;
} }
void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...)
{
va_list args;
va_start (args, event_);
va_monitor_event (socket_, event_, args);
va_end (args);
}
void zmq::ctx_t::va_monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_)
{
if (monitor_fn != NULL) {
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args_, char*);
data.connected.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args_, char*);
data.connect_delayed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args_, char*);
data.connect_retried.interval = va_arg (args_, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args_, char*);
data.listening.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args_, char*);
data.bind_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args_, char*);
data.accepted.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args_, char*);
data.accept_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args_, char*);
data.closed.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args_, char*);
data.close_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args_, char*);
data.disconnected.fd = va_arg (args_, int);
break;
default:
zmq_assert (false);
}
monitor_fn ((void *)socket_, event_, &data);
}
}
// The last used socket ID, or 0 if no socket was used so far. Note that this // The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have // is a global variable. Thus, even sockets created in different contexts have
// unique IDs. // unique IDs.
......
...@@ -95,11 +95,6 @@ namespace zmq ...@@ -95,11 +95,6 @@ namespace zmq
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
// Monitoring specific
int monitor (zmq_monitor_fn *monitor_);
void monitor_event (zmq::socket_base_t *socket_, int event_, ...);
void va_monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_);
enum { enum {
term_tid = 0, term_tid = 0,
reaper_tid = 1 reaper_tid = 1
...@@ -169,9 +164,6 @@ namespace zmq ...@@ -169,9 +164,6 @@ namespace zmq
// Synchronisation of access to context options. // Synchronisation of access to context options.
mutex_t opt_sync; mutex_t opt_sync;
// Monitoring callback
zmq_monitor_fn *monitor_fn;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&); const ctx_t &operator = (const ctx_t&);
}; };
......
...@@ -56,6 +56,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, ...@@ -56,6 +56,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert (addr); zmq_assert (addr);
zmq_assert (addr->protocol == "ipc"); zmq_assert (addr->protocol == "ipc");
addr->to_string (endpoint); addr->to_string (endpoint);
socket = session-> get_socket();
} }
zmq::ipc_connecter_t::~ipc_connecter_t () zmq::ipc_connecter_t::~ipc_connecter_t ()
...@@ -121,7 +122,7 @@ void zmq::ipc_connecter_t::out_event () ...@@ -121,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); socket->event_connected (endpoint.c_str(), fd);
} }
void zmq::ipc_connecter_t::timer_event (int id_) void zmq::ipc_connecter_t::timer_event (int id_)
...@@ -148,7 +149,7 @@ void zmq::ipc_connecter_t::start_connecting () ...@@ -148,7 +149,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); socket->event_connect_delayed (endpoint.c_str(), zmq_errno());
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -163,7 +164,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer() ...@@ -163,7 +164,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
{ {
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); socket->event_connect_retried (endpoint.c_str(), rc_ivl);
timer_started = true; timer_started = true;
} }
...@@ -224,7 +225,7 @@ int zmq::ipc_connecter_t::close () ...@@ -224,7 +225,7 @@ int zmq::ipc_connecter_t::close ()
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); socket->event_closed (endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
return 0; return 0;
} }
......
...@@ -113,6 +113,9 @@ namespace zmq ...@@ -113,6 +113,9 @@ namespace zmq
// String representation of endpoint to connect to // String representation of endpoint to connect to
std::string endpoint; std::string endpoint;
// Socket
zmq::socket_base_t *socket;
ipc_connecter_t (const ipc_connecter_t&); ipc_connecter_t (const ipc_connecter_t&);
const ipc_connecter_t &operator = (const ipc_connecter_t&); const ipc_connecter_t &operator = (const ipc_connecter_t&);
}; };
......
...@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) { if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); socket->event_accept_failed (endpoint.c_str(), zmq_errno());
return; return;
} }
...@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); socket->event_accepted (endpoint.c_str(), fd);
} }
int zmq::ipc_listener_t::get_address (std::string &addr_) int zmq::ipc_listener_t::get_address (std::string &addr_)
...@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) ...@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0) if (rc != 0)
goto error; goto error;
socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s); socket->event_listening (endpoint.c_str(), s);
return 0; return 0;
error: error:
...@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close () ...@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
if (has_file && !filename.empty ()) { if (has_file && !filename.empty ()) {
rc = ::unlink(filename.c_str ()); rc = ::unlink(filename.c_str ());
if (rc != 0) { if (rc != 0) {
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno()); socket->event_close_failed (endpoint.c_str(), zmq_errno());
return -1; return -1;
} }
} }
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); socket->event_closed (endpoint.c_str(), s);
return 0; return 0;
} }
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include <stdarg.h> #include <stdarg.h>
#include "session_base.hpp" #include "session_base.hpp"
#include "socket_base.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -286,17 +285,9 @@ void zmq::session_base_t::hiccuped (pipe_t *) ...@@ -286,17 +285,9 @@ void zmq::session_base_t::hiccuped (pipe_t *)
zmq_assert (false); zmq_assert (false);
} }
void zmq::session_base_t::monitor_event (int event_, ...) zmq::socket_base_t *zmq::session_base_t::get_socket ()
{ {
va_list args; return socket;
va_start (args, event_);
va_monitor_event (event_, args);
va_end (args);
}
void zmq::session_base_t::va_monitor_event (int event_, va_list args)
{
socket->va_monitor_event (event_, args);
} }
void zmq::session_base_t::process_plug () void zmq::session_base_t::process_plug ()
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "i_msg_source.hpp" #include "i_msg_source.hpp"
#include "i_msg_sink.hpp" #include "i_msg_sink.hpp"
#include "socket_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -75,8 +76,7 @@ namespace zmq ...@@ -75,8 +76,7 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
void monitor_event (int event_, ...); socket_base_t *get_socket ();
void va_monitor_event (int event_, va_list args);
protected: protected:
......
...@@ -130,13 +130,16 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -130,13 +130,16 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
destroyed (false), destroyed (false),
last_tsc (0), last_tsc (0),
ticks (0), ticks (0),
rcvmore (false) rcvmore (false),
monitor_socket (NULL),
monitor_events (0)
{ {
options.socket_id = sid_; options.socket_id = sid_;
} }
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
{ {
stop_monitor ();
zmq_assert (destroyed); zmq_assert (destroyed);
} }
...@@ -354,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -354,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); event_bind_failed (addr_, zmq_errno());
return -1; return -1;
} }
...@@ -373,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -373,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno()); event_bind_failed (addr_, zmq_errno());
return -1; return -1;
} }
...@@ -845,6 +848,7 @@ void zmq::socket_base_t::process_stop () ...@@ -845,6 +848,7 @@ void zmq::socket_base_t::process_stop ()
// We'll remember the fact so that any blocking call is interrupted and any // We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still // further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though! // responsible for calling zmq_close on the socket though!
stop_monitor ();
ctx_terminated = true; ctx_terminated = true;
} }
...@@ -996,15 +1000,172 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) ...@@ -996,15 +1000,172 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
rcvmore = msg_->flags () & msg_t::more ? true : false; rcvmore = msg_->flags () & msg_t::more ? true : false;
} }
void zmq::socket_base_t::monitor_event (int event_, ...) int zmq::socket_base_t::monitor (const char *addr_, int events_)
{ {
va_list args; int rc;
va_start (args, event_); if (unlikely (ctx_terminated)) {
va_monitor_event(event_, args); errno = ETERM;
va_end (args); return -1;
}
// Support deregistering monitoring endpoints as well
if (addr_ == NULL) {
stop_monitor ();
return 0;
}
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
return -1;
// Event notification only supported over inproc://
if (protocol != "inproc") {
errno = EPROTONOSUPPORT;
return -1;
}
// Register events to monitor
monitor_events = events_;
monitor_socket = zmq_socket( get_ctx (), ZMQ_PAIR);
if (monitor_socket == NULL)
return -1;
// Never block context termination on pending event messages
int linger = 0;
rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
if (rc == -1)
stop_monitor ();
// Spawn the monitor socket endpoint
rc = zmq_bind (monitor_socket, addr_);
if (rc == -1)
stop_monitor ();
return rc;
}
void zmq::socket_base_t::event_connected (const char *addr_, int fd_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_CONNECTED)) return;
event.event = ZMQ_EVENT_CONNECTED;
event.data.connected.addr = (char *)addr_;
event.data.connected.fd = fd_;
monitor_event (event);
} }
void zmq::socket_base_t::va_monitor_event (int event_, va_list args) void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_)
{ {
get_ctx ()->va_monitor_event (this, event_, args); zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_CONNECT_DELAYED)) return;
event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.data.connected.addr = (char *)addr_;
event.data.connect_delayed.err = err_;
monitor_event (event);
}
void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_CONNECT_RETRIED)) return;
event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.data.connected.addr = (char *)addr_;
event.data.connect_retried.interval = interval_;
monitor_event (event);
}
void zmq::socket_base_t::event_listening (const char *addr_, int fd_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_LISTENING)) return;
event.event = ZMQ_EVENT_LISTENING;
event.data.connected.addr = (char *)addr_;
event.data.listening.fd = fd_;
monitor_event (event);
}
void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_BIND_FAILED)) return;
event.event = ZMQ_EVENT_BIND_FAILED;
event.data.connected.addr = (char *)addr_;
event.data.bind_failed.err = err_;
monitor_event (event);
}
void zmq::socket_base_t::event_accepted (const char *addr_, int fd_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_ACCEPTED)) return;
event.event = ZMQ_EVENT_ACCEPTED;
event.data.connected.addr = (char *)addr_;
event.data.accepted.fd = fd_;
monitor_event (event);
}
void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_ACCEPT_FAILED)) return;
event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.data.connected.addr = (char *)addr_;
event.data.accept_failed.err= err_;
monitor_event (event);
}
void zmq::socket_base_t::event_closed (const char *addr_, int fd_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_CLOSED)) return;
event.event = ZMQ_EVENT_CLOSED;
event.data.connected.addr = (char *)addr_;
event.data.closed.fd = fd_;
monitor_event (event);
}
void zmq::socket_base_t::event_close_failed (const char *addr_, int err_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_CLOSE_FAILED)) return;
event.event = ZMQ_EVENT_CLOSE_FAILED;
event.data.connected.addr = (char *)addr_;
event.data.close_failed.err = err_;
monitor_event (event);
}
void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_)
{
zmq_event_t event;
if (!(monitor_events & ZMQ_EVENT_DISCONNECTED)) return;
event.event = ZMQ_EVENT_DISCONNECTED;
event.data.connected.addr = (char *)addr_;
event.data.disconnected.fd = fd_;
monitor_event (event);
}
void zmq::socket_base_t::monitor_event (zmq_event_t event_)
{
zmq_msg_t msg;
if (!monitor_socket) return;
zmq_msg_init_size (&msg, sizeof (event_));
memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
zmq_sendmsg (monitor_socket, &msg, 0);
zmq_msg_close (&msg);
}
void zmq::socket_base_t::stop_monitor()
{
if (monitor_socket) {
zmq_close (monitor_socket);
monitor_socket = NULL;
monitor_events = 0;
}
} }
\ No newline at end of file
...@@ -102,8 +102,18 @@ namespace zmq ...@@ -102,8 +102,18 @@ namespace zmq
void lock(); void lock();
void unlock(); void unlock();
void monitor_event (int event_, ...); int monitor(const char *endpoint_, int events_);
void va_monitor_event (int event_, va_list args);
void event_connected(const char *addr_, int fd_);
void event_connect_delayed(const char *addr_, int err_);
void event_connect_retried(const char *addr_, int interval_);
void event_listening(const char *addr_, int fd_);
void event_bind_failed(const char *addr_, int err_);
void event_accepted(const char *addr_, int fd_);
void event_accept_failed(const char *addr_, int err_);
void event_closed(const char *addr_, int fd_);
void event_close_failed(const char *addr_, int fd_);
void event_disconnected(const char *addr_, int fd_);
protected: protected:
...@@ -138,6 +148,12 @@ namespace zmq ...@@ -138,6 +148,12 @@ namespace zmq
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
void process_destroy (); void process_destroy ();
// Socket event data dispath
void monitor_event (zmq_event_t data_);
// Monitor socket cleanup
void stop_monitor ();
private: private:
// Creates new endpoint ID and adds the endpoint to the map. // Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_); void add_endpoint (const char *addr_, own_t *endpoint_);
...@@ -210,6 +226,12 @@ namespace zmq ...@@ -210,6 +226,12 @@ namespace zmq
// Improves efficiency of time measurement. // Improves efficiency of time measurement.
clock_t clock; clock_t clock;
// Monitor socket;
void *monitor_socket;
// Bitmask of events being monitored
int monitor_events;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&);
mutex_t sync; mutex_t sync;
......
...@@ -62,7 +62,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons ...@@ -62,7 +62,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
session (NULL), session (NULL),
options (options_), options (options_),
endpoint (endpoint_), endpoint (endpoint_),
plugged (false) plugged (false),
socket (NULL)
{ {
// Put the socket into non-blocking mode. // Put the socket into non-blocking mode.
unblock_socket (s); unblock_socket (s);
...@@ -126,6 +127,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -126,6 +127,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
zmq_assert (!session); zmq_assert (!session);
zmq_assert (session_); zmq_assert (session_);
session = session_; session = session_;
socket = session-> get_socket ();
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
...@@ -445,7 +447,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_) ...@@ -445,7 +447,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()
{ {
zmq_assert (session); zmq_assert (session);
session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s); socket->event_disconnected (endpoint.c_str(), s);
session->detach (); session->detach ();
unplug (); unplug ();
delete this; delete this;
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "i_encoder.hpp" #include "i_encoder.hpp"
#include "i_decoder.hpp" #include "i_decoder.hpp"
#include "options.hpp" #include "options.hpp"
#include "socket_base.hpp"
#include "../include/zmq.h" #include "../include/zmq.h"
namespace zmq namespace zmq
...@@ -133,6 +134,9 @@ namespace zmq ...@@ -133,6 +134,9 @@ namespace zmq
bool plugged; bool plugged;
// Socket
zmq::socket_base_t *socket;
stream_engine_t (const stream_engine_t&); stream_engine_t (const stream_engine_t&);
const stream_engine_t &operator = (const stream_engine_t&); const stream_engine_t &operator = (const stream_engine_t&);
}; };
......
...@@ -66,6 +66,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, ...@@ -66,6 +66,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq_assert (addr); zmq_assert (addr);
zmq_assert (addr->protocol == "tcp"); zmq_assert (addr->protocol == "tcp");
addr->to_string (endpoint); addr->to_string (endpoint);
socket = session-> get_socket();
} }
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
...@@ -135,7 +136,7 @@ void zmq::tcp_connecter_t::out_event () ...@@ -135,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd); socket->event_connected (endpoint.c_str(), fd);
} }
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
...@@ -162,7 +163,7 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -162,7 +163,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); socket->event_connect_delayed (endpoint.c_str(), zmq_errno());
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -177,7 +178,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer() ...@@ -177,7 +178,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
{ {
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); socket->event_connect_retried (endpoint.c_str(), rc_ivl);
timer_started = true; timer_started = true;
} }
...@@ -303,6 +304,6 @@ void zmq::tcp_connecter_t::close () ...@@ -303,6 +304,6 @@ void zmq::tcp_connecter_t::close ()
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); socket->event_closed (endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
} }
...@@ -111,6 +111,9 @@ namespace zmq ...@@ -111,6 +111,9 @@ namespace zmq
// String representation of endpoint to connect to // String representation of endpoint to connect to
std::string endpoint; std::string endpoint;
// Socket
zmq::socket_base_t *socket;
tcp_connecter_t (const tcp_connecter_t&); tcp_connecter_t (const tcp_connecter_t&);
const tcp_connecter_t &operator = (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&);
}; };
......
...@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) { if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno()); socket->event_accept_failed (endpoint.c_str(), zmq_errno());
return; return;
} }
...@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd); socket->event_accepted (endpoint.c_str(), fd);
} }
void zmq::tcp_listener_t::close () void zmq::tcp_listener_t::close ()
...@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close () ...@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s); socket->event_closed (endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
} }
...@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error; goto error;
#endif #endif
socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s); socket->event_listening (endpoint.c_str(), s);
return 0; return 0;
error: error:
......
...@@ -210,15 +210,6 @@ int zmq_ctx_get (void *ctx_, int option_) ...@@ -210,15 +210,6 @@ int zmq_ctx_get (void *ctx_, int option_)
return ((zmq::ctx_t*) ctx_)->get (option_); return ((zmq::ctx_t*) ctx_)->get (option_);
} }
int zmq_ctx_set_monitor (void *ctx_, zmq_monitor_fn *monitor_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->monitor (monitor_);
}
// Stable/legacy context API // Stable/legacy context API
void *zmq_init (int io_threads_) void *zmq_init (int io_threads_)
...@@ -284,6 +275,17 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) ...@@ -284,6 +275,17 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return result; return result;
} }
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->monitor (addr_, events_);
return result;
}
int zmq_bind (void *s_, const char *addr_) int zmq_bind (void *s_, const char *addr_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment