Commit 5c6f72c1 authored by Lourens Naudé's avatar Lourens Naudé

ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state

parent 16ec2868
......@@ -20,6 +20,8 @@ autom4te.cache
.*
*~
.*~
tests/test_term_endpoint
tests/test_monitor
tests/test_last_endpoint
tests/test_pair_inproc
tests/test_pair_ipc
......
......@@ -76,6 +76,7 @@ Thijs Terlouw <thijsterlouw@gmail.com>
Toralf Wittner <toralf.wittner@gmail.com>
Tore Halvorsen <tore.halvorsen@gmail.com>
Vitaly Mayatskikh <v.mayatskih@gmail.com>
Lourens Naudé <lourens@methodmissing.com>
Credits
=======
......
......@@ -57,6 +57,8 @@ Building
New functionality
-----------------
* ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state.
* POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer
instead of message object).
......
......@@ -455,6 +455,16 @@ Option value unit:: -1,>0
Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
The default value of `NULL` means no monitor callback function.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE
------------
......
......@@ -431,12 +431,51 @@ Default value:: no filters (allow from all)
Applicable socket types:: all listening sockets, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
Expected signature is `void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data)`
To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)`
The default value of `NULL` means no monitor callback function.
Supported events are :
* 'ZMQ_EVENT_CONNECTED' - connection established
* 'ZMQ_EVENT_CONNECT_DELAYED' - connection could not be established synchronously, it's being polled
* 'ZMQ_EVENT_CONNECT_RETRIED' - asynchronous connect / reconnection attempt
* 'ZMQ_EVENT_LISTENING' - socket bound to an address, ready to accept connections
* 'ZMQ_EVENT_BIND_FAILED' - socket couldn't bind to an address
* 'ZMQ_EVENT_ACCEPTED' - connection accepted to bound interface
* 'ZMQ_EVENT_ACCEPT_FAILED' - could not accept client connection
* 'ZMQ_EVENT_CLOSED' - connection closed
* 'ZMQ_EVENT_CLOSE_FAILED' - connection couldn't be closed
* 'ZMQ_EVENT_DISCONNECTED' - broken session
See `zmq_event_data_t` and `ZMQ_EVENT_*` constants in zmq.h for event specific data (third argument to callback).
Please note that both events and their context data aren't stable contracts. The 'ZMQ_MONITOR' socket option is
intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation
of something that happened - you cannot change the past, but only react to them. The implementation also only concerned
with a single session. No state of peers, other sessions etc. are tracked - this will only pollute internals and is the
responsibility of application authors to either implement or correlate in another datastore. Monitor events are exceptional
conditions and are thus not directly in the messaging critical path. However, still be careful with what you're doing in the
callback function as severe latency there will block the socket's application thread.
Only tcp and ipc specific transport events are supported in this initial implementation.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
......
......@@ -227,7 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_MONITOR 39
/* Message options */
#define ZMQ_MORE 1
......@@ -236,6 +236,72 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
/******************************************************************************/
/* 0MQ socket events and monitoring */
/******************************************************************************/
/* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2
#define ZMQ_EVENT_CONNECT_RETRIED 3
#define ZMQ_EVENT_LISTENING 4
#define ZMQ_EVENT_BIND_FAILED 5
#define ZMQ_EVENT_ACCEPTED 6
#define ZMQ_EVENT_ACCEPT_FAILED 7
#define ZMQ_EVENT_CLOSED 8
#define ZMQ_EVENT_CLOSE_FAILED 9
#define ZMQ_EVENT_DISCONNECTED 10
/* Socket event data (union member per event) */
typedef union {
struct {
char *addr;
int fd;
} connected;
struct {
char *addr;
int err;
} connect_delayed;
struct {
char *addr;
int interval;
} connect_retried;
struct {
char *addr;
int fd;
} listening;
struct {
char *addr;
int err;
} bind_failed;
struct {
char *addr;
int fd;
} accepted;
struct {
char *addr;
int err;
} accept_failed;
struct {
char *addr;
int fd;
} closed;
struct {
char *addr;
int err;
} close_failed;
struct {
char *addr;
int fd;
} disconnected;
} zmq_event_data_t;
/* Callback template for socket state changes */
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s);
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
......
......@@ -52,7 +52,7 @@ zmq::address_t::~address_t ()
#endif
}
int zmq::address_t::to_string (std::string &addr_)
int zmq::address_t::to_string (std::string &addr_) const
{
if (protocol == "tcp") {
if (resolved.tcp_addr) {
......
......@@ -45,7 +45,7 @@ namespace zmq
#endif
} resolved;
int to_string (std::string &addr_);
int to_string (std::string &addr_) const;
};
}
......
......@@ -33,6 +33,7 @@
#include "ip.hpp"
#include "address.hpp"
#include "ipc_address.hpp"
#include "session_base.hpp"
#include <unistd.h>
#include <sys/types.h>
......@@ -53,6 +54,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
{
zmq_assert (addr);
zmq_assert (addr->protocol == "ipc");
addr->to_string (endpoint);
}
zmq::ipc_connecter_t::~ipc_connecter_t ()
......@@ -105,6 +107,8 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down.
terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd);
}
void zmq::ipc_connecter_t::timer_event (int id_)
......@@ -132,6 +136,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return;
}
......@@ -143,7 +148,9 @@ void zmq::ipc_connecter_t::start_connecting ()
void zmq::ipc_connecter_t::add_reconnect_timer()
{
add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
}
int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
......@@ -195,8 +202,11 @@ int zmq::ipc_connecter_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
if (rc != 0)
if (rc != 0) {
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1;
}
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd;
return 0;
}
......
......@@ -106,6 +106,9 @@ namespace zmq
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
ipc_connecter_t (const ipc_connecter_t&);
const ipc_connecter_t &operator = (const ipc_connecter_t&);
};
......
......@@ -33,6 +33,7 @@
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "socket_base.hpp"
#include <unistd.h>
#include <sys/socket.h>
......@@ -75,8 +76,10 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno());
return;
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
......@@ -94,6 +97,7 @@ void zmq::ipc_listener_t::in_event ()
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd);
}
int zmq::ipc_listener_t::get_address (std::string &addr_)
......@@ -133,6 +137,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (s == -1)
return -1;
address.to_string (endpoint);
// Bind the socket to the file path.
rc = bind (s, address.addr (), address.addrlen ());
if (rc != 0)
......@@ -146,6 +152,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0)
return -1;
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s);
return 0;
}
......@@ -153,18 +160,23 @@ int zmq::ipc_listener_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
if (rc != 0)
if (rc != 0) {
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1;
s = retired_fd;
}
// If there's an underlying UNIX domain socket, get rid of the file it
// is associated with.
if (has_file && !filename.empty ()) {
rc = ::unlink(filename.c_str ());
if (rc != 0)
if (rc != 0) {
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1;
}
}
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd;
return 0;
}
......
......@@ -84,6 +84,9 @@ namespace zmq
// Socket the listerner belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
ipc_listener_t (const ipc_listener_t&);
const ipc_listener_t &operator = (const ipc_listener_t&);
};
......
......@@ -53,6 +53,7 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1),
monitor (NULL),
socket_id (0)
{
}
......@@ -313,6 +314,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
}
case ZMQ_MONITOR:
{
if (optvallen_ == 0 && optval_ == NULL) {
monitor = NULL;
return 0;
}
if (optvallen_ != sizeof (void *)) {
errno = EINVAL;
return -1;
}
monitor = ((zmq_monitor_fn*) optval_);
return 0;
}
}
errno = EINVAL;
return -1;
......@@ -530,6 +545,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = last_endpoint.size()+1;
return 0;
case ZMQ_MONITOR:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
((zmq_monitor_fn*) optval_) = monitor;
*optvallen_ = sizeof (void *);
return 0;
}
errno = EINVAL;
......
......@@ -29,6 +29,7 @@
#include "stddef.h"
#include "stdint.hpp"
#include "tcp_address.hpp"
#include "../include/zmq.h"
namespace zmq
{
......@@ -124,6 +125,9 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters;
// Connection and exceptional state callback function
zmq_monitor_fn *monitor;
// ID of the socket.
int socket_id;
};
......
......@@ -255,6 +255,21 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_)
zmq_assert (false);
}
int zmq::session_base_t::get_address (std::string &addr_)
{
if (addr)
return addr->to_string (addr_);
return -1;
}
void zmq::session_base_t::monitor_event (int event_, ...)
{
va_list args;
va_start (args, event_);
socket->monitor_event (event_, args);
va_end (args);
}
void zmq::session_base_t::process_plug ()
{
if (connect)
......
......@@ -65,6 +65,9 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_);
int get_address (std::string &addr_);
void monitor_event (int event_, ...);
protected:
session_base_t (zmq::io_thread_t *io_thread_, bool connect_,
......
......@@ -351,6 +351,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
return -1;
}
......@@ -369,6 +370,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
return -1;
}
......@@ -980,3 +982,59 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
}
void zmq::socket_base_t::monitor_event (int event_, ...)
{
if (options.monitor != NULL) {
va_list args;
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
va_start (args, event_);
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args, char*);
data.connected.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args, char*);
data.connect_delayed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args, char*);
data.connect_retried.interval = va_arg (args, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args, char*);
data.listening.fd = va_arg (args, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args, char*);
data.bind_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args, char*);
data.accepted.fd = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args, char*);
data.accept_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args, char*);
data.closed.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args, char*);
data.close_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args, char*);
data.disconnected.fd = va_arg (args, int);
break;
default:
zmq_assert (false);
}
options.monitor ((void *)this, event_, &data);
va_end (args);
}
}
\ No newline at end of file
......@@ -100,6 +100,9 @@ namespace zmq
void terminated (pipe_t *pipe_);
void lock();
void unlock();
void monitor_event (int event_, ...);
protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
......
......@@ -117,6 +117,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
decoder.set_session (session_);
session = session_;
session->get_address (endpoint);
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
......@@ -143,6 +145,7 @@ void zmq::stream_engine_t::unplug ()
decoder.set_session (NULL);
leftover_session = session;
session = NULL;
endpoint.clear();
}
void zmq::stream_engine_t::terminate ()
......@@ -267,6 +270,7 @@ void zmq::stream_engine_t::activate_in ()
void zmq::stream_engine_t::error ()
{
zmq_assert (session);
session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s);
session->detach ();
unplug ();
delete this;
......
......@@ -30,6 +30,7 @@
#include "encoder.hpp"
#include "decoder.hpp"
#include "options.hpp"
#include "../include/zmq.h"
namespace zmq
{
......@@ -96,6 +97,9 @@ namespace zmq
options_t options;
// String representation of endpoint
std::string endpoint;
bool plugged;
stream_engine_t (const stream_engine_t&);
......
......@@ -31,6 +31,7 @@
#include "ip.hpp"
#include "address.hpp"
#include "tcp_address.hpp"
#include "session_base.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
......@@ -62,6 +63,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
{
zmq_assert (addr);
zmq_assert (addr->protocol == "tcp");
addr->to_string (endpoint);
}
zmq::tcp_connecter_t::~tcp_connecter_t ()
......@@ -117,6 +119,8 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down.
terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd);
}
void zmq::tcp_connecter_t::timer_event (int id_)
......@@ -144,6 +148,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return;
}
......@@ -155,7 +160,9 @@ void zmq::tcp_connecter_t::start_connecting ()
void zmq::tcp_connecter_t::add_reconnect_timer()
{
add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
}
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
......@@ -277,10 +284,15 @@ void zmq::tcp_connecter_t::close ()
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
if (unlikely (rc != SOCKET_ERROR))
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (s);
if (unlikely (rc == 0))
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
errno_assert (rc == 0);
#endif
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd;
}
......@@ -26,6 +26,7 @@
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "../include/zmq.h"
namespace zmq
{
......@@ -103,6 +104,9 @@ namespace zmq
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
tcp_connecter_t (const tcp_connecter_t&);
const tcp_connecter_t &operator = (const tcp_connecter_t&);
};
......
......@@ -31,6 +31,7 @@
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "socket_base.hpp"
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
......@@ -83,8 +84,10 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno());
return;
}
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
......@@ -105,6 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd);
}
void zmq::tcp_listener_t::close ()
......@@ -112,11 +116,16 @@ void zmq::tcp_listener_t::close ()
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
if (unlikely (rc != SOCKET_ERROR))
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (s);
if (unlikely (rc == 0))
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
errno_assert (rc == 0);
#endif
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd;
}
......@@ -185,6 +194,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
errno_assert (rc == 0);
#endif
address.to_string (endpoint);
// Bind the socket to the network interface and port.
rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
......@@ -209,6 +220,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
return -1;
#endif
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s);
return 0;
}
......
......@@ -27,6 +27,7 @@
#include "stdint.hpp"
#include "io_object.hpp"
#include "tcp_address.hpp"
#include "../include/zmq.h"
namespace zmq
{
......@@ -78,6 +79,9 @@ namespace zmq
// Socket the listerner belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
tcp_listener_t (const tcp_listener_t&);
const tcp_listener_t &operator = (const tcp_listener_t&);
};
......
......@@ -14,7 +14,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_msg_flags \
test_connect_resolve \
test_last_endpoint \
test_term_endpoint
test_term_endpoint \
test_monitor
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
......@@ -35,6 +36,7 @@ test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
case ZMQ_EVENT_LISTENING:
assert (data_->listening.fd > 0);
assert (memcmp (data_->listening.addr, addr, 22));
break;
case ZMQ_EVENT_ACCEPTED:
assert (data_->accepted.fd > 0);
assert (memcmp (data_->accepted.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
case ZMQ_EVENT_CONNECTED:
assert (data_->connected.fd > 0);
assert (memcmp (data_->connected.addr, addr, 22));
break;
case ZMQ_EVENT_CONNECT_DELAYED:
assert (data_->connect_delayed.err != 0);
assert (memcmp (data_->connect_delayed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
int main (int argc, char *argv [])
{
int rc;
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// Expects failure - invalid size
rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, 20);
assert (rc == -1);
assert (errno == EINVAL);
rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, sizeof (void *));
assert (rc == 0);
void *monitor;
size_t sz = sizeof (void *);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
// Remove socket monitor callback
rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0);
assert (rc == 0);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
rc = zmq_bind (rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_setsockopt (req, ZMQ_MONITOR, (void *)connecting_sock_monitor, sizeof (void *));
assert (rc == 0);
rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
// Deallocate the infrastructure.
rc = zmq_close (req);
assert (rc == 0);
// Allow for closed or disconnected events to bubble up
zmq_sleep (1);
rc = zmq_close (rep);
assert (rc == 0);
zmq_sleep (1);
zmq_term (ctx);
return 0 ;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment