Unverified Commit 33a493c4 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3379 from sigiesec/add-v2-monitoring

Add v2 monitoring
parents 4d8e5fb2 0e2e303b
......@@ -639,6 +639,7 @@ set(cxx-sources
devpoll.cpp
dgram.cpp
dist.cpp
endpoint.cpp
epoll.cpp
err.cpp
fq.cpp
......@@ -657,6 +658,7 @@ set(cxx-sources
metadata.cpp
msg.cpp
mtrie.cpp
norm_engine.cpp
object.cpp
options.cpp
own.cpp
......@@ -745,6 +747,7 @@ set(cxx-sources
dish.hpp
dist.hpp
encoder.hpp
endpoint.hpp
epoll.hpp
err.hpp
fd.hpp
......
......@@ -55,6 +55,8 @@ src_libzmq_la_SOURCES = \
src/dist.cpp \
src/dist.hpp \
src/encoder.hpp \
src/endpoint.hpp \
src/endpoint.cpp \
src/epoll.cpp \
src/epoll.hpp \
src/err.cpp \
......@@ -539,7 +541,8 @@ tests_test_srcfd_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_srcfd_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_monitor_SOURCES = tests/test_monitor.cpp
tests_test_monitor_LDADD = src/libzmq.la
tests_test_monitor_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_monitor_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_router_mandatory_SOURCES = tests/test_router_mandatory.cpp tests/testutil_unity.hpp
tests_test_router_mandatory_LDADD = src/libzmq.la ${UNITY_LIBS}
......@@ -599,6 +602,7 @@ tests_test_security_plain_LDADD = src/libzmq.la
tests_test_security_zap_SOURCES = \
tests/test_security_zap.cpp \
tests/testutil_monitoring.hpp \
tests/testutil_security.hpp \
tests/testutil.hpp
tests_test_security_zap_LDADD = src/libzmq.la
......
......@@ -21,6 +21,11 @@ method creates a 'ZMQ_PAIR' socket and binds that to the specified
inproc:// 'endpoint'. To collect the socket events, you must create
your own 'ZMQ_PAIR' socket, and connect that to the endpoint.
Note that there is also a DRAFT function linkzmq:zmq_socket_monitor_versioned[3],
which allows to subscribe to events that provide more information.
Calling zmq_socket_monitor is equivalent to calling 'zmq_socket_monitor_versioned'
with the 'event_version' parameter set to 1, with the exception of error cases.
The 'events' argument is a bitmask of the socket events you wish to
monitor, see 'Supported events' below. To monitor all events, use the
event value ZMQ_EVENT_ALL. NOTE: as new events are added, the catch-all
......@@ -31,7 +36,7 @@ guarantee compatibility with future versions.
Each event is sent as two frames. The first frame contains an event
number (16 bits), and an event value (32 bits) that provides additional
data according to the event number. The second frame contains a string
that specifies the affected TCP or IPC endpoint.
that specifies the affected endpoint.
----
The _zmq_socket_monitor()_ method supports only connection-oriented
......
This diff is collapsed.
......@@ -723,6 +723,17 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
const void *routing_id,
size_t routing_id_size);
#define ZMQ_CURRENT_EVENT_VERSION 1
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
#endif // ZMQ_BUILD_DRAFT_API
......
......@@ -108,3 +108,17 @@ int zmq::address_t::to_string (std::string &addr_) const
addr_.clear ();
return -1;
}
zmq::zmq_socklen_t zmq::get_socket_address (fd_t fd_,
socket_end_t socket_end_,
sockaddr_storage *ss_)
{
zmq_socklen_t sl = static_cast<zmq_socklen_t> (sizeof (*ss_));
const int rc =
socket_end_ == socket_end_local
? getsockname (fd_, reinterpret_cast<struct sockaddr *> (ss_), &sl)
: getpeername (fd_, reinterpret_cast<struct sockaddr *> (ss_), &sl);
return rc != 0 ? 0 : sl;
}
......@@ -30,8 +30,16 @@
#ifndef __ZMQ_ADDRESS_HPP_INCLUDED__
#define __ZMQ_ADDRESS_HPP_INCLUDED__
#include "fd.hpp"
#include <string>
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/socket.h>
#else
#include <ws2tcpip.h>
#endif
namespace zmq
{
class ctx_t;
......@@ -97,6 +105,37 @@ struct address_t
int to_string (std::string &addr_) const;
};
#if defined(ZMQ_HAVE_HPUX) || defined(ZMQ_HAVE_VXWORKS) \
|| defined(ZMQ_HAVE_WINDOWS)
typedef int zmq_socklen_t;
#else
typedef socklen_t zmq_socklen_t;
#endif
enum socket_end_t
{
socket_end_local,
socket_end_remote
};
zmq_socklen_t
get_socket_address (fd_t fd_, socket_end_t socket_end_, sockaddr_storage *ss_);
template <typename T>
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_)
{
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
if (sl == 0) {
return std::string ();
}
const T addr (reinterpret_cast<struct sockaddr *> (&ss), sl);
std::string address_string;
addr.to_string (address_string);
return address_string;
}
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "endpoint.hpp"
zmq::endpoint_uri_pair_t
zmq::make_unconnected_connect_endpoint_pair (const std::string &endpoint_)
{
return endpoint_uri_pair_t (std::string (), endpoint_,
endpoint_type_connect);
}
zmq::endpoint_uri_pair_t
zmq::make_unconnected_bind_endpoint_pair (const std::string &endpoint_)
{
return endpoint_uri_pair_t (endpoint_, std::string (), endpoint_type_bind);
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_ENDPOINT_HPP_INCLUDED__
#include <string>
namespace zmq
{
enum endpoint_type_t
{
endpoint_type_none, // a connection-less endpoint
endpoint_type_bind, // a connection-oriented bind endpoint
endpoint_type_connect // a connection-oriented connect endpoint
};
struct endpoint_uri_pair_t
{
endpoint_uri_pair_t () : local_type (endpoint_type_none) {}
endpoint_uri_pair_t (const std::string &local,
const std::string &remote,
endpoint_type_t local_type) :
local (local),
remote (remote),
local_type (local_type)
{
}
const std::string &identifier () const
{
return local_type == endpoint_type_bind ? local : remote;
}
std::string local, remote;
endpoint_type_t local_type;
};
endpoint_uri_pair_t
make_unconnected_connect_endpoint_pair (const std::string &endpoint_);
endpoint_uri_pair_t
make_unconnected_bind_endpoint_pair (const std::string &endpoint_);
}
#endif
......@@ -30,6 +30,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include "endpoint.hpp"
namespace zmq
{
class io_thread_t;
......@@ -61,7 +63,7 @@ struct i_engine
virtual void zap_msg_available () = 0;
virtual const char *get_endpoint () const = 0;
virtual const endpoint_uri_pair_t &get_endpoint () const = 0;
};
}
......
......@@ -74,7 +74,7 @@ void zmq::ipc_connecter_t::out_event ()
return;
}
create_engine (fd);
create_engine (fd, get_socket_name<ipc_address_t> (fd, socket_end_local));
}
void zmq::ipc_connecter_t::start_connecting ()
......@@ -92,7 +92,8 @@ void zmq::ipc_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// TODO, tcp_connecter_t adds a connect timer in this case; maybe this
// should be done here as well (and then this could be pulled up to
......
......@@ -43,6 +43,7 @@
#include "err.hpp"
#include "ip.hpp"
#include "socket_base.hpp"
#include "address.hpp"
#include <unistd.h>
#include <sys/socket.h>
......@@ -142,7 +143,8 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -150,20 +152,14 @@ void zmq::ipc_listener_t::in_event ()
create_engine (fd);
}
int zmq::ipc_listener_t::get_address (std::string &addr_)
std::string
zmq::ipc_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (&ss);
if (sl == 0) {
addr_.clear ();
return -1;
}
ipc_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl);
return addr.to_string (addr_);
return zmq::get_socket_name<ipc_address_t> (fd_, socket_end_);
}
int zmq::ipc_listener_t::set_address (const char *addr_)
int zmq::ipc_listener_t::set_local_address (const char *addr_)
{
// Create addr on stack for auto-cleanup
std::string addr (addr_);
......@@ -232,7 +228,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
_filename = ZMQ_MOVE (addr);
_has_file = true;
_socket->event_listening (_endpoint, _s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
......@@ -260,12 +257,14 @@ int zmq::ipc_listener_t::close ()
}
if (rc != 0) {
_socket->event_close_failed (_endpoint, zmq_errno ());
_socket->event_close_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return -1;
}
}
_socket->event_closed (_endpoint, fd_for_event);
_socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint),
fd_for_event);
return 0;
}
......
......@@ -48,10 +48,10 @@ class ipc_listener_t : public stream_listener_base_t
const options_t &options_);
// Set address to listen on.
int set_address (const char *addr_);
int set_local_address (const char *addr_);
// Get the bound address for use with wildcards
int get_address (std::string &addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private:
// Handlers for I/O events.
......
......@@ -709,9 +709,9 @@ zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
return nextItem;
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
const char *zmq::norm_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
#endif // ZMQ_HAVE_NORM
......@@ -47,7 +47,7 @@ class norm_engine_t : public io_object_t, public i_engine
virtual void zap_msg_available (){};
virtual const char *get_endpoint () const;
virtual const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)
......@@ -150,6 +150,8 @@ class norm_engine_t : public io_object_t, public i_engine
}; // end class zmq::norm_engine_t::NormRxStreamState
const endpoint_uri_pair_t _empty_endpoint;
session_base_t *zmq_session;
options_t options;
NormInstanceHandle norm_instance;
......
......@@ -244,7 +244,8 @@ zmq::options_t::options_t () :
loopback_fastpath (false),
multicast_loop (true),
zero_copy (true),
router_notify (0)
router_notify (0),
monitor_event_version (1)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE);
......
......@@ -266,6 +266,9 @@ struct options_t
// Application metadata
std::map<std::string, std::string> app_metadata;
// Version of monitor events to emit
int monitor_event_version;
};
inline bool get_effective_conflate_option (const options_t &options)
......
......@@ -156,9 +156,9 @@ bool zmq::pgm_receiver_t::restart_input ()
return true;
}
const char *zmq::pgm_receiver_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::pgm_receiver_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
void zmq::pgm_receiver_t::in_event ()
......
......@@ -60,7 +60,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available () {}
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -84,6 +84,8 @@ class pgm_receiver_t : public io_object_t, public i_engine
rx_timer_id = 0xa1
};
const endpoint_uri_pair_t _empty_endpoint;
// RX timer is running.
bool has_rx_timer;
......
......@@ -143,9 +143,9 @@ bool zmq::pgm_sender_t::restart_input ()
return true;
}
const char *zmq::pgm_sender_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
zmq::pgm_sender_t::~pgm_sender_t ()
......
......@@ -59,7 +59,7 @@ class pgm_sender_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available () {}
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -77,6 +77,8 @@ class pgm_sender_t : public io_object_t, public i_engine
rx_timer_id = 0xa1
};
const endpoint_uri_pair_t _empty_endpoint;
// Timers are running.
bool has_tx_timer;
bool has_rx_timer;
......
......@@ -554,12 +554,12 @@ void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
send_pipe_hwm (_peer, inhwm_, outhwm_);
}
void zmq::pipe_t::set_endpoint_uri (const char *name_)
void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
{
_endpoint_uri = name_;
_endpoint_pair = endpoint_pair_;
}
std::string &zmq::pipe_t::get_endpoint_uri ()
const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
{
return _endpoint_uri;
return _endpoint_pair;
}
......@@ -37,6 +37,7 @@
#include "array.hpp"
#include "blob.hpp"
#include "options.hpp"
#include "endpoint.hpp"
namespace zmq
{
......@@ -141,8 +142,8 @@ class pipe_t : public object_t,
// Returns true if HWM is not reached
bool check_hwm () const;
void set_endpoint_uri (const char *name_);
std::string &get_endpoint_uri ();
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
const endpoint_uri_pair_t &get_endpoint_pair () const;
private:
// Type of the underlying lock-free pipe.
......@@ -247,9 +248,8 @@ class pipe_t : public object_t,
const bool _conflate;
// If the pipe belongs to socket's endpoint the endpoint's name is stored here.
// Otherwise this is empty.
std::string _endpoint_uri;
// The endpoints of this pipe.
endpoint_uri_pair_t _endpoint_pair;
// Disable copying.
pipe_t (const pipe_t &);
......
......@@ -117,7 +117,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
{
}
const char *zmq::session_base_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
{
return _engine->get_endpoint ();
}
......
......@@ -92,7 +92,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
int write_zap_msg (msg_t *msg_);
socket_base_t *get_socket ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
protected:
session_base_t (zmq::io_thread_t *io_thread_,
......
This diff is collapsed.
......@@ -43,6 +43,7 @@
#include "i_mailbox.hpp"
#include "clock.hpp"
#include "pipe.hpp"
#include "endpoint.hpp"
extern "C" {
void zmq_free_event (void *data_, void *hint_);
......@@ -118,26 +119,38 @@ class socket_base_t : public own_t,
void lock ();
void unlock ();
int monitor (const char *endpoint_, int events_);
int monitor (const char *endpoint_, uint64_t events_, int event_version_);
void event_connected (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_connect_delayed (const std::string &endpoint_uri_, int err_);
void event_connect_retried (const std::string &endpoint_uri_,
void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_,
int interval_);
void event_listening (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_bind_failed (const std::string &endpoint_uri_, int err_);
void event_accepted (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_accept_failed (const std::string &endpoint_uri_, int err_);
void event_closed (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_close_failed (const std::string &endpoint_uri_, int err_);
void event_disconnected (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_handshake_failed_no_detail (const std::string &endpoint_uri_,
int err_);
void event_handshake_failed_protocol (const std::string &endpoint_uri_,
int err_);
void event_handshake_failed_auth (const std::string &endpoint_uri_,
int err_);
void event_handshake_succeeded (const std::string &endpoint_uri_, int err_);
void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_handshake_failed_no_detail (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
void event_handshake_failed_protocol (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
void
event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void
event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
// Query the state of a specific peer. The default implementation
// always returns an ENOTSUP error.
......@@ -186,19 +199,22 @@ class socket_base_t : public own_t,
private:
// test if event should be sent and then dispatch it
void event (const std::string &endpoint_uri_, intptr_t value_, int type_);
void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_,
uint64_t type_);
// Socket event data dispatch
void monitor_event (int event_,
intptr_t value_,
const std::string &endpoint_uri_) const;
void monitor_event (uint64_t event_,
uint64_t value_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const;
// Monitor socket cleanup
void stop_monitor (bool send_monitor_stopped_event_ = true);
// Creates new endpoint ID and adds the endpoint to the map.
void
add_endpoint (const char *endpoint_uri_, own_t *endpoint_, pipe_t *pipe_);
void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_,
own_t *endpoint_,
pipe_t *pipe_);
// Map of open endpoints.
typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
......@@ -295,7 +311,7 @@ class socket_base_t : public own_t,
void *_monitor_socket;
// Bitmask of events being monitored
int _monitor_events;
int64_t _monitor_events;
// Last socket endpoint resolved URI
std::string _last_endpoint;
......
......@@ -150,15 +150,19 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1)
error ();
else {
const endpoint_uri_pair_t endpoint_pair = endpoint_uri_pair_t (
get_socket_name<tcp_address_t> (_s, socket_end_local),
_endpoint, endpoint_type_connect);
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (_s, options, _endpoint);
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (_s, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
_socket->event_connected (_endpoint, _s);
_socket->event_connected (endpoint_pair, _s);
rm_fd (_handle);
_s = -1;
......@@ -225,7 +229,8 @@ void zmq::socks_connecter_t::initiate_connect ()
_handle = add_fd (_s);
set_pollout (_handle);
_status = waiting_for_proxy_connection;
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
}
// Handle any other error condition by eventual reconnect.
else {
......@@ -272,7 +277,8 @@ void zmq::socks_connecter_t::start_timer ()
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_status = waiting_for_reconnect_time;
_socket->event_connect_retried (_endpoint, interval);
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
}
}
......@@ -443,7 +449,8 @@ void zmq::socks_connecter_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
......
......@@ -42,6 +42,7 @@ class io_thread_t;
class session_base_t;
struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t
class socks_connecter_t : public own_t, public io_object_t
{
public:
......
......@@ -94,7 +94,8 @@ void zmq::stream_connecter_base_t::add_reconnect_timer ()
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
_reconnect_timer_started = true;
}
}
......@@ -131,7 +132,8 @@ void zmq::stream_connecter_base_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
......@@ -143,11 +145,15 @@ void zmq::stream_connecter_base_t::in_event ()
out_event ();
}
void zmq::stream_connecter_base_t::create_engine (fd_t fd)
void zmq::stream_connecter_base_t::create_engine (
fd_t fd, const std::string &local_address_)
{
const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
endpoint_type_connect);
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
......@@ -156,7 +162,7 @@ void zmq::stream_connecter_base_t::create_engine (fd_t fd)
// Shut the connecter down.
terminate ();
_socket->event_connected (_endpoint, fd);
_socket->event_connected (endpoint_pair, fd);
}
void zmq::stream_connecter_base_t::timer_event (int id_)
......
......@@ -63,7 +63,7 @@ class stream_connecter_base_t : public own_t, public io_object_t
void timer_event (int id_);
// Internal function to create the engine after connection was established.
void create_engine (fd_t fd);
void create_engine (fd_t fd, const std::string &local_address_);
// Internal function to add a reconnect timer
void add_reconnect_timer ();
......
......@@ -100,9 +100,10 @@ static std::string get_peer_address (zmq::fd_t s_)
}
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
const options_t &options_,
const std::string &endpoint_) :
zmq::stream_engine_t::stream_engine_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
_s (fd_),
_handle (static_cast<handle_t> (NULL)),
_inpos (NULL),
......@@ -117,7 +118,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
_greeting_bytes_read (0),
_session (NULL),
_options (options_),
_endpoint (endpoint_),
_endpoint_uri_pair (endpoint_uri_pair_),
_plugged (false),
_next_msg (&stream_engine_t::routing_id_msg),
_process_msg (&stream_engine_t::process_routing_id_msg),
......@@ -894,9 +895,9 @@ void zmq::stream_engine_t::zap_msg_available ()
restart_output ();
}
const char *zmq::stream_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const
{
return _endpoint.c_str ();
return _endpoint_uri_pair;
}
void zmq::stream_engine_t::mechanism_ready ()
......@@ -960,7 +961,7 @@ void zmq::stream_engine_t::mechanism_ready ()
alloc_assert (_metadata);
}
_socket->event_handshake_succeeded (_endpoint, 0);
_socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
}
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
......@@ -1081,10 +1082,10 @@ void zmq::stream_engine_t::error (error_reason_t reason_)
&& (_mechanism == NULL
|| _mechanism->status () == mechanism_t::handshaking)) {
int err = errno;
_socket->event_handshake_failed_no_detail (_endpoint, err);
_socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
}
_socket->event_disconnected (_endpoint, _s);
_socket->event_disconnected (_endpoint_uri_pair, _s);
_session->flush ();
_session->engine_error (reason_);
unplug ();
......
......@@ -70,7 +70,7 @@ class stream_engine_t : public io_object_t, public i_engine
stream_engine_t (fd_t fd_,
const options_t &options_,
const std::string &endpoint_);
const endpoint_uri_pair_t &endpoint_uri_pair_);
~stream_engine_t ();
// i_engine interface implementation.
......@@ -79,7 +79,7 @@ class stream_engine_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -190,8 +190,8 @@ class stream_engine_t : public io_object_t, public i_engine
const options_t _options;
// String representation of endpoint
const std::string _endpoint;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
bool _plugged;
......
......@@ -51,15 +51,10 @@ zmq::stream_listener_base_t::~stream_listener_base_t ()
zmq_assert (!_handle);
}
zmq::zmq_socklen_t
zmq::stream_listener_base_t::get_socket_address (sockaddr_storage *ss_) const
int zmq::stream_listener_base_t::get_local_address (std::string &addr_) const
{
zmq_socklen_t sl = static_cast<zmq_socklen_t> (sizeof (*ss_));
const int rc =
getsockname (_s, reinterpret_cast<struct sockaddr *> (ss_), &sl);
return rc != 0 ? 0 : sl;
addr_ = get_socket_name (_s, socket_end_local);
return addr_.empty () ? -1 : 0;
}
void zmq::stream_listener_base_t::process_plug ()
......@@ -89,7 +84,7 @@ int zmq::stream_listener_base_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint), _s);
_s = retired_fd;
return 0;
......@@ -97,8 +92,12 @@ int zmq::stream_listener_base_t::close ()
void zmq::stream_listener_base_t::create_engine (fd_t fd)
{
const endpoint_uri_pair_t endpoint_pair (
get_socket_name (fd, socket_end_local),
get_socket_name (fd, socket_end_remote), endpoint_type_bind);
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
......@@ -113,5 +112,6 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd)
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
_socket->event_accepted (endpoint_pair, fd);
}
......@@ -36,19 +36,13 @@
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "tipc_address.hpp"
#include "address.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
#if defined(ZMQ_HAVE_HPUX) || defined(ZMQ_HAVE_VXWORKS)
typedef int zmq_socklen_t;
#else
typedef socklen_t zmq_socklen_t;
#endif
class stream_listener_base_t : public own_t, public io_object_t
{
public:
......@@ -57,8 +51,12 @@ class stream_listener_base_t : public own_t, public io_object_t
const options_t &options_);
~stream_listener_base_t ();
// Get the bound address for use with wildcards
int get_local_address (std::string &addr_) const;
protected:
zmq_socklen_t get_socket_address (sockaddr_storage *ss_) const;
virtual std::string get_socket_name (fd_t fd_,
socket_end_t socket_end_) const = 0;
private:
// Handlers for incoming commands.
......
......@@ -111,7 +111,7 @@ void zmq::tcp_connecter_t::out_event ()
return;
}
create_engine (fd);
create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
}
void zmq::tcp_connecter_t::timer_event (int id_)
......@@ -140,7 +140,8 @@ void zmq::tcp_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// add userspace connect timeout
add_connect_timer ();
......
......@@ -40,6 +40,7 @@
#include "ip.hpp"
#include "tcp.hpp"
#include "socket_base.hpp"
#include "address.hpp"
#ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h>
......@@ -72,7 +73,8 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -83,7 +85,8 @@ void zmq::tcp_listener_t::in_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -91,35 +94,20 @@ void zmq::tcp_listener_t::in_event ()
create_engine (fd);
}
int zmq::tcp_listener_t::get_address (std::string &addr_)
std::string
zmq::tcp_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
// Get the details of the TCP socket
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (&ss);
if (!sl) {
addr_.clear ();
return -1;
}
tcp_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl);
return addr.to_string (addr_);
return zmq::get_socket_name<tcp_address_t> (fd_, socket_end_);
}
int zmq::tcp_listener_t::set_address (const char *addr_)
int zmq::tcp_listener_t::create_socket (const char *addr_)
{
// Convert the textual address into address structure.
int rc = _address.resolve (addr_, true, options.ipv6);
if (rc != 0)
return -1;
_address.to_string (_endpoint);
if (options.use_fd != -1) {
_s = options.use_fd;
_socket->event_listening (_endpoint, _s);
return 0;
}
// Create a listening socket.
_s = open_socket (_address.family (), SOCK_STREAM, IPPROTO_TCP);
......@@ -203,7 +191,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error;
#endif
_socket->event_listening (_endpoint, _s);
return 0;
error:
......@@ -213,6 +200,24 @@ error:
return -1;
}
int zmq::tcp_listener_t::set_local_address (const char *addr_)
{
if (options.use_fd != -1) {
// in this case, the addr_ passed is not used and ignored, since the
// socket was already created by the application
_s = options.use_fd;
} else {
if (create_socket (addr_) == -1)
return -1;
}
_endpoint = get_socket_name (_s, socket_end_local);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
}
zmq::fd_t zmq::tcp_listener_t::accept ()
{
// The situation where connection cannot be accepted due to insufficient
......
......@@ -44,10 +44,10 @@ class tcp_listener_t : public stream_listener_base_t
const options_t &options_);
// Set address to listen on.
int set_address (const char *addr_);
int set_local_address (const char *addr_);
// Get the bound address for use with wildcard
int get_address (std::string &addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private:
// Handlers for I/O events.
......@@ -59,6 +59,8 @@ class tcp_listener_t : public stream_listener_base_t
// or was denied because of accept filters.
fd_t accept ();
int create_socket (const char *addr_);
// Address to listen on.
tcp_address_t _address;
......
......@@ -130,7 +130,7 @@ int zmq::tipc_address_t::resolve (const char *name)
return EINVAL;
}
int zmq::tipc_address_t::to_string (std::string &addr_)
int zmq::tipc_address_t::to_string (std::string &addr_) const
{
if (address.family != AF_TIPC) {
addr_.clear ();
......
......@@ -55,7 +55,7 @@ class tipc_address_t
int resolve (const char *name);
// The opposite to resolve()
int to_string (std::string &addr_);
int to_string (std::string &addr_) const;
// Handling different TIPC address types
bool is_service () const;
......
......@@ -76,7 +76,7 @@ void zmq::tipc_connecter_t::out_event ()
return;
}
create_engine (fd);
create_engine (fd, get_socket_name<tipc_address_t> (fd, socket_end_local));
}
void zmq::tipc_connecter_t::start_connecting ()
......@@ -94,7 +94,8 @@ void zmq::tipc_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
}
// Handle any other error condition by eventual reconnect.
......
......@@ -43,6 +43,7 @@
#include "err.hpp"
#include "ip.hpp"
#include "socket_base.hpp"
#include "address.hpp"
#include <unistd.h>
#include <sys/socket.h>
......@@ -68,7 +69,8 @@ void zmq::tipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -76,20 +78,14 @@ void zmq::tipc_listener_t::in_event ()
create_engine (fd);
}
int zmq::tipc_listener_t::get_address (std::string &addr_)
std::string
zmq::tipc_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (&ss);
if (!sl) {
addr_.clear ();
return -1;
}
tipc_address_t addr ((struct sockaddr *) &ss, sl);
return addr.to_string (addr_);
return zmq::get_socket_name<tipc_address_t> (fd_, socket_end_);
}
int zmq::tipc_listener_t::set_address (const char *addr_)
int zmq::tipc_listener_t::set_local_address (const char *addr_)
{
// Convert str to address struct
int rc = _address.resolve (addr_);
......@@ -111,7 +107,7 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
// If random Port Identity, update address object to reflect the assigned address
if (_address.is_random ()) {
struct sockaddr_storage ss;
const zmq_socklen_t sl = get_socket_address (&ss);
const zmq_socklen_t sl = get_socket_address (_s, socket_end_local, &ss);
if (sl == 0)
goto error;
......@@ -137,7 +133,8 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
if (rc != 0)
goto error;
_socket->event_listening (_endpoint, _s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
......
......@@ -50,10 +50,10 @@ class tipc_listener_t : public stream_listener_base_t
const options_t &options_);
// Set address to listen on.
int set_address (const char *addr_);
int set_local_address (const char *addr_);
// Get the bound address for use with wildcards
int get_address (std::string &addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
private:
// Handlers for I/O events.
......
......@@ -444,9 +444,9 @@ void zmq::udp_engine_t::out_event ()
reset_pollout (_handle);
}
const char *zmq::udp_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
void zmq::udp_engine_t::restart_output ()
......
......@@ -43,12 +43,14 @@ class udp_engine_t : public io_object_t, public i_engine
void in_event ();
void out_event ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
private:
int resolve_raw_address (char *addr_, size_t length_);
void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_);
const endpoint_uri_pair_t _empty_endpoint;
bool _plugged;
fd_t _fd;
......
......@@ -45,6 +45,7 @@ class io_thread_t;
class session_base_t;
struct address_t;
// TODO consider refactoring this to derive from stream_connecter_base_t
class vmci_connecter_t : public own_t, public io_object_t
{
public:
......
......@@ -125,7 +125,7 @@ void zmq::vmci_listener_t::in_event ()
socket->event_accepted (endpoint, fd);
}
int zmq::vmci_listener_t::get_address (std::string &addr_)
int zmq::vmci_listener_t::get_local_address (std::string &addr_)
{
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX
......@@ -143,7 +143,7 @@ int zmq::vmci_listener_t::get_address (std::string &addr_)
return addr.to_string (addr_);
}
int zmq::vmci_listener_t::set_address (const char *addr_)
int zmq::vmci_listener_t::set_local_address (const char *addr_)
{
// Create addr on stack for auto-cleanup
std::string addr (addr_);
......
......@@ -46,6 +46,7 @@ namespace zmq
class io_thread_t;
class socket_base_t;
// TODO consider refactoring this to derive from stream_listener_base_t
class vmci_listener_t : public own_t, public io_object_t
{
public:
......@@ -55,10 +56,10 @@ class vmci_listener_t : public own_t, public io_object_t
~vmci_listener_t ();
// Set address to listen on.
int set_address (const char *addr_);
int set_local_address (const char *addr_);
// Get the bound address for use with wildcards
int get_address (std::string &addr_);
int get_local_address (std::string &addr_);
private:
// Handlers for incoming commands.
......
......@@ -267,12 +267,20 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return s->getsockopt (option_, optval_, optvallen_);
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->monitor (addr_, events_);
return s->monitor (addr_, events_, event_version_);
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
return zmq_socket_monitor_versioned (s_, addr_, events_, 1);
}
int zmq_join (void *s_, const char *group_)
......
......@@ -123,6 +123,11 @@ int zmq_socket_get_peer_state (void *socket_,
const void *routing_id_,
size_t routing_id_size_);
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
#endif // ZMQ_BUILD_DRAFT_API
#endif //ifndef __ZMQ_DRAFT_H_INCLUDED__
......@@ -186,7 +186,7 @@ foreach(test ${tests})
add_executable(${test} ${test}.cpp
"testutil_security.hpp")
else()
add_executable(${test} ${test}.cpp "testutil.hpp" "testutil_unity.hpp")
add_executable(${test} ${test}.cpp "testutil.hpp" "testutil_unity.hpp" "testutil_monitoring.hpp")
endif()
if(WIN32)
# This is the output for Debug dynamic builds on Visual Studio 6.0
......
......@@ -28,58 +28,65 @@
*/
#include "testutil.hpp"
#include "testutil_security.hpp"
#include "testutil_monitoring.hpp"
int main (void)
#include "testutil_unity.hpp"
void setUp ()
{
setup_test_environment ();
setup_test_context ();
}
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void tearDown ()
{
teardown_test_context ();
}
// We'll monitor these two sockets
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
void test_monitor_invalid_protocol_fails ()
{
void *client = test_context_socket (ZMQ_DEALER);
// Socket monitoring only works over inproc://
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0);
assert (rc == -1);
assert (zmq_errno () == EPROTONOSUPPORT);
TEST_ASSERT_FAILURE_ERRNO (
EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
test_context_socket_close_zero_linger (client);
}
void test_monitor_basic ()
{
char my_endpoint[MAX_SOCKET_STRING];
// We'll monitor these two sockets
void *client = test_context_socket (ZMQ_DEALER);
void *server = test_context_socket (ZMQ_DEALER);
// Monitor all events on client and server sockets
rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL));
// Create two sockets for collecting monitor events
void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (client_mon);
void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (server_mon);
void *client_mon = test_context_socket (ZMQ_PAIR);
void *server_mon = test_context_socket (ZMQ_PAIR);
// Connect these to the inproc endpoints so they'll get events
rc = zmq_connect (client_mon, "inproc://monitor-client");
assert (rc == 0);
rc = zmq_connect (server_mon, "inproc://monitor-server");
assert (rc == 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (client_mon, "inproc://monitor-client"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (server_mon, "inproc://monitor-server"));
// Now do a basic ping test
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
bounce (server, client);
// Close client and server
close_zero_linger (client);
close_zero_linger (server);
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (client);
test_context_socket_close_zero_linger (server);
// Now collect and check events from both sockets
int event = get_monitor_event (client_mon, NULL, NULL);
......@@ -96,17 +103,129 @@ int main (void)
event = get_monitor_event (server_mon, NULL, NULL);
// Sometimes the server sees the client closing before it gets closed.
if (event != ZMQ_EVENT_DISCONNECTED) {
assert (event == ZMQ_EVENT_CLOSED);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
event = get_monitor_event (server_mon, NULL, NULL);
}
if (event != ZMQ_EVENT_DISCONNECTED) {
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
}
// Close down the sockets
close_zero_linger (client_mon);
close_zero_linger (server_mon);
zmq_ctx_term (ctx);
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (client_mon);
test_context_socket_close_zero_linger (server_mon);
}
#ifdef ZMQ_BUILD_DRAFT_API
void test_monitor_versioned_basic (bind_function_t bind_function_,
const char *expected_prefix_)
{
char server_endpoint[MAX_SOCKET_STRING];
// We'll monitor these two sockets
void *client = test_context_socket (ZMQ_DEALER);
void *server = test_context_socket (ZMQ_DEALER);
// Monitor all events on client and server sockets
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
client, "inproc://monitor-client", ZMQ_EVENT_ALL_V2, 2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
server, "inproc://monitor-server", ZMQ_EVENT_ALL_V2, 2));
// Create two sockets for collecting monitor events
void *client_mon = test_context_socket (ZMQ_PAIR);
void *server_mon = test_context_socket (ZMQ_PAIR);
// Connect these to the inproc endpoints so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (client_mon, "inproc://monitor-client"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (server_mon, "inproc://monitor-server"));
// Now do a basic ping test
bind_function_ (server, server_endpoint, sizeof server_endpoint);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, server_endpoint));
bounce (server, client);
// Close client and server
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (client);
test_context_socket_close_zero_linger (server);
char *client_local_address = NULL;
char *client_remote_address = NULL;
// Now collect and check events from both sockets
int64_t event = get_monitor_event_v2 (
client_mon, NULL, &client_local_address, &client_remote_address);
if (event == ZMQ_EVENT_CONNECT_DELAYED) {
free (client_local_address);
free (client_remote_address);
event = get_monitor_event_v2 (client_mon, NULL, &client_local_address,
&client_remote_address);
}
TEST_ASSERT_EQUAL (ZMQ_EVENT_CONNECTED, event);
TEST_ASSERT_EQUAL_STRING (server_endpoint, client_remote_address);
TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, client_local_address,
strlen (expected_prefix_));
TEST_ASSERT_NOT_EQUAL (
0, strcmp (client_local_address, client_remote_address));
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
client_local_address, client_remote_address);
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", "");
// This is the flow of server events
expect_monitor_event_v2 (server_mon, ZMQ_EVENT_LISTENING,
client_remote_address, "");
expect_monitor_event_v2 (server_mon, ZMQ_EVENT_ACCEPTED,
client_remote_address, client_local_address);
expect_monitor_event_v2 (server_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
client_remote_address, client_local_address);
event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
// Sometimes the server sees the client closing before it gets closed.
if (event != ZMQ_EVENT_DISCONNECTED) {
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
}
if (event != ZMQ_EVENT_DISCONNECTED) {
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
}
free (client_local_address);
free (client_remote_address);
// Close down the sockets
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (client_mon);
test_context_socket_close_zero_linger (server_mon);
}
void test_monitor_versioned_basic_tcp_ipv4 ()
{
static const char prefix[] = "tcp://127.0.0.1:";
test_monitor_versioned_basic (bind_loopback_ipv4, prefix);
}
void test_monitor_versioned_basic_tcp_ipv6 ()
{
static const char prefix[] = "tcp://[::1]:";
test_monitor_versioned_basic (bind_loopback_ipv6, prefix);
}
#endif
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_monitor_invalid_protocol_fails);
RUN_TEST (test_monitor_basic);
#ifdef ZMQ_BUILD_DRAFT_API
RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
#endif
return 0;
return UNITY_END ();
}
This diff is collapsed.
......@@ -31,6 +31,7 @@
#define __TESTUTIL_SECURITY_HPP_INCLUDED__
#include "testutil.hpp"
#include "testutil_monitoring.hpp"
// security test utils
......@@ -322,163 +323,7 @@ void zap_handler (void *ctx_)
zap_handler_generic (ctx_, zap_ok);
}
// Monitor event utilities
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.
static int get_monitor_event_internal (void *monitor_,
int *value_,
char **address_,
int recv_flag_)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
assert (errno == EAGAIN);
return -1; // timed out or no message available
}
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value_)
memcpy (value_, data + 2, sizeof (uint32_t));
// Second frame in message contains event address
zmq_msg_init (&msg);
int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
assert (res != -1);
assert (!zmq_msg_more (&msg));
if (address_) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address_ = (char *) malloc (size + 1);
memcpy (*address_, data, size);
*address_[size] = 0;
}
return event;
}
int get_monitor_event_with_timeout (void *monitor_,
int *value_,
char **address_,
int timeout_)
{
int res;
if (timeout_ == -1) {
// process infinite timeout in small steps to allow the user
// to see some information on the console
int timeout_step = 250;
int wait_time = 0;
zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
sizeof (timeout_step));
while (
(res = get_monitor_event_internal (monitor_, value_, address_, 0))
== -1) {
wait_time += timeout_step;
fprintf (stderr, "Still waiting for monitor event after %i ms\n",
wait_time);
}
} else {
zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
res = get_monitor_event_internal (monitor_, value_, address_, 0);
}
int timeout_infinite = -1;
zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
sizeof (timeout_infinite));
return res;
}
int get_monitor_event (void *monitor_, int *value_, char **address_)
{
return get_monitor_event_with_timeout (monitor_, value_, address_, -1);
}
void expect_monitor_event (void *monitor_, int expected_event_)
{
int event = get_monitor_event (monitor_, NULL, NULL);
if (event != expected_event_) {
fprintf (stderr, "Expected monitor event %x but received %x\n",
expected_event_, event);
assert (event == expected_event_);
}
}
void print_unexpected_event (int event_,
int err_,
int expected_event_,
int expected_err_)
{
fprintf (stderr,
"Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
"= %i/0x%x)\n",
event_, err_, err_, expected_event_, expected_err_, expected_err_);
}
// expects that one or more occurrences of the expected event are received
// via the specified socket monitor
// returns the number of occurrences of the expected event
// interrupts, if a ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL with EPIPE, ECONNRESET
// or ECONNABORTED occurs; in this case, 0 is returned
// this should be investigated further, see
// https://github.com/zeromq/libzmq/issues/2644
int expect_monitor_event_multiple (void *server_mon_,
int expected_event_,
int expected_err_ = -1,
bool optional_ = false)
{
int count_of_expected_events = 0;
int client_closed_connection = 0;
int timeout = 250;
int wait_time = 0;
int event;
int err;
while ((event =
get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout))
!= -1
|| !count_of_expected_events) {
if (event == -1) {
if (optional_)
break;
wait_time += timeout;
fprintf (stderr,
"Still waiting for first event after %ims (expected event "
"%x (value %i/0x%x))\n",
wait_time, expected_event_, expected_err_, expected_err_);
continue;
}
// ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
// ECONNRESET can happen on very slow machines, when the engine writes
// to the peer and then tries to read the socket before the peer reads
// ECONNABORTED happens when a client aborts a connection via RST/timeout
if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
&& ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET
|| err == ECONNABORTED)) {
fprintf (stderr,
"Ignored event (skipping any further events): %x (err = "
"%i == %s)\n",
event, err, zmq_strerror (err));
client_closed_connection = 1;
break;
}
if (event != expected_event_
|| (-1 != expected_err_ && err != expected_err_)) {
print_unexpected_event (event, err, expected_event_, expected_err_);
assert (false);
}
++count_of_expected_events;
}
assert (optional_ || count_of_expected_events > 0
|| client_closed_connection);
return count_of_expected_events;
}
// Security-specific monitor event utilities
// assert_* are macros rather than functions, to allow assertion failures be
// attributed to the causing source code line
......
......@@ -313,7 +313,16 @@ void bind_loopback (void *socket_, int ipv6_, char *my_endpoint_, size_t len_)
my_endpoint_, len_);
}
typedef void (*bind_function_t) (void *socket_,
char *my_endpoint_,
size_t len_);
void bind_loopback_ipv4 (void *socket_, char *my_endpoint_, size_t len_)
{
bind_loopback (socket_, false, my_endpoint_, len_);
}
void bind_loopback_ipv6 (void *socket_, char *my_endpoint_, size_t len_)
{
bind_loopback (socket_, true, my_endpoint_, len_);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment