Commit 01371398 authored by Simon Giesecke's avatar Simon Giesecke Committed by Simon Giesecke

Problem: monitor events are unversioned

Solution: add monitor event socket option
parent 2e735546
......@@ -639,6 +639,7 @@ set(cxx-sources
devpoll.cpp
dgram.cpp
dist.cpp
endpoint.cpp
epoll.cpp
err.cpp
fq.cpp
......@@ -746,6 +747,7 @@ set(cxx-sources
dish.hpp
dist.hpp
encoder.hpp
endpoint.hpp
epoll.hpp
err.hpp
fd.hpp
......
......@@ -55,6 +55,8 @@ src_libzmq_la_SOURCES = \
src/dist.cpp \
src/dist.hpp \
src/encoder.hpp \
src/endpoint.hpp \
src/endpoint.cpp \
src/epoll.cpp \
src/epoll.hpp \
src/err.cpp \
......
......@@ -723,6 +723,11 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
const void *routing_id,
size_t routing_id_size);
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
#endif // ZMQ_BUILD_DRAFT_API
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "endpoint.hpp"
zmq::endpoint_uri_pair_t
zmq::make_unconnected_connect_endpoint_pair (const std::string &endpoint_)
{
return endpoint_uri_pair_t (std::string (), endpoint_,
endpoint_type_connect);
}
zmq::endpoint_uri_pair_t
zmq::make_unconnected_bind_endpoint_pair (const std::string &endpoint_)
{
return endpoint_uri_pair_t (endpoint_, std::string (), endpoint_type_bind);
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_ENDPOINT_HPP_INCLUDED__
#include <string>
namespace zmq
{
enum endpoint_type_t
{
endpoint_type_none, // a connection-less endpoint
endpoint_type_bind, // a connection-oriented bind endpoint
endpoint_type_connect // a connection-oriented connect endpoint
};
struct endpoint_uri_pair_t
{
endpoint_uri_pair_t () : local_type (endpoint_type_none) {}
endpoint_uri_pair_t (const std::string &local,
const std::string &remote,
endpoint_type_t local_type) :
local (local),
remote (remote),
local_type (local_type)
{
}
const std::string &identifier () const
{
return local_type == endpoint_type_bind ? local : remote;
}
std::string local, remote;
endpoint_type_t local_type;
};
endpoint_uri_pair_t
make_unconnected_connect_endpoint_pair (const std::string &endpoint_);
endpoint_uri_pair_t
make_unconnected_bind_endpoint_pair (const std::string &endpoint_);
}
#endif
......@@ -30,6 +30,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include "endpoint.hpp"
namespace zmq
{
class io_thread_t;
......@@ -61,7 +63,7 @@ struct i_engine
virtual void zap_msg_available () = 0;
virtual const char *get_endpoint () const = 0;
virtual const endpoint_uri_pair_t &get_endpoint () const = 0;
};
}
......
......@@ -92,7 +92,8 @@ void zmq::ipc_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// TODO, tcp_connecter_t adds a connect timer in this case; maybe this
// should be done here as well (and then this could be pulled up to
......
......@@ -142,7 +142,8 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -224,7 +225,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
_filename = ZMQ_MOVE (addr);
_has_file = true;
_socket->event_listening (_endpoint, _s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
......@@ -252,12 +254,14 @@ int zmq::ipc_listener_t::close ()
}
if (rc != 0) {
_socket->event_close_failed (_endpoint, zmq_errno ());
_socket->event_close_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return -1;
}
}
_socket->event_closed (_endpoint, fd_for_event);
_socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint),
fd_for_event);
return 0;
}
......
......@@ -709,9 +709,9 @@ zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
return nextItem;
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
const char *zmq::norm_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
#endif // ZMQ_HAVE_NORM
......@@ -47,7 +47,7 @@ class norm_engine_t : public io_object_t, public i_engine
virtual void zap_msg_available (){};
virtual const char *get_endpoint () const;
virtual const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)
......@@ -150,6 +150,8 @@ class norm_engine_t : public io_object_t, public i_engine
}; // end class zmq::norm_engine_t::NormRxStreamState
const endpoint_uri_pair_t _empty_endpoint;
session_base_t *zmq_session;
options_t options;
NormInstanceHandle norm_instance;
......
......@@ -244,7 +244,8 @@ zmq::options_t::options_t () :
loopback_fastpath (false),
multicast_loop (true),
zero_copy (true),
router_notify (0)
router_notify (0),
monitor_event_version (1)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE);
......
......@@ -266,6 +266,9 @@ struct options_t
// Application metadata
std::map<std::string, std::string> app_metadata;
// Version of monitor events to emit
int monitor_event_version;
};
inline bool get_effective_conflate_option (const options_t &options)
......
......@@ -156,9 +156,9 @@ bool zmq::pgm_receiver_t::restart_input ()
return true;
}
const char *zmq::pgm_receiver_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::pgm_receiver_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
void zmq::pgm_receiver_t::in_event ()
......
......@@ -60,7 +60,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available () {}
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -84,6 +84,8 @@ class pgm_receiver_t : public io_object_t, public i_engine
rx_timer_id = 0xa1
};
const endpoint_uri_pair_t _empty_endpoint;
// RX timer is running.
bool has_rx_timer;
......
......@@ -143,9 +143,9 @@ bool zmq::pgm_sender_t::restart_input ()
return true;
}
const char *zmq::pgm_sender_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
zmq::pgm_sender_t::~pgm_sender_t ()
......
......@@ -59,7 +59,7 @@ class pgm_sender_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available () {}
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -77,6 +77,8 @@ class pgm_sender_t : public io_object_t, public i_engine
rx_timer_id = 0xa1
};
const endpoint_uri_pair_t _empty_endpoint;
// Timers are running.
bool has_tx_timer;
bool has_rx_timer;
......
......@@ -554,12 +554,12 @@ void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
send_pipe_hwm (_peer, inhwm_, outhwm_);
}
void zmq::pipe_t::set_endpoint_uri (const char *name_)
void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
{
_endpoint_uri = name_;
_endpoint_pair = endpoint_pair_;
}
std::string &zmq::pipe_t::get_endpoint_uri ()
const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
{
return _endpoint_uri;
return _endpoint_pair;
}
......@@ -37,6 +37,7 @@
#include "array.hpp"
#include "blob.hpp"
#include "options.hpp"
#include "endpoint.hpp"
namespace zmq
{
......@@ -141,8 +142,8 @@ class pipe_t : public object_t,
// Returns true if HWM is not reached
bool check_hwm () const;
void set_endpoint_uri (const char *name_);
std::string &get_endpoint_uri ();
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
const endpoint_uri_pair_t &get_endpoint_pair () const;
private:
// Type of the underlying lock-free pipe.
......@@ -247,9 +248,8 @@ class pipe_t : public object_t,
const bool _conflate;
// If the pipe belongs to socket's endpoint the endpoint's name is stored here.
// Otherwise this is empty.
std::string _endpoint_uri;
// The endpoints of this pipe.
endpoint_uri_pair_t _endpoint_pair;
// Disable copying.
pipe_t (const pipe_t &);
......
......@@ -117,7 +117,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
{
}
const char *zmq::session_base_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
{
return _engine->get_endpoint ();
}
......
......@@ -92,7 +92,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
int write_zap_msg (msg_t *msg_);
socket_base_t *get_socket ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
protected:
session_base_t (zmq::io_thread_t *io_thread_,
......
......@@ -31,6 +31,7 @@
#include <new>
#include <string>
#include <algorithm>
#include <limits>
#include "macros.hpp"
......@@ -591,7 +592,10 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
// Save last endpoint URI
paddr->to_string (_last_endpoint);
add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
// TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
endpoint_type_none),
static_cast<own_t *> (session), newpipe);
return 0;
}
......@@ -611,15 +615,16 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
rc = listener->set_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (listener);
event_bind_failed (address, zmq_errno ());
event_bind_failed (make_unconnected_bind_endpoint_pair (address),
zmq_errno ());
return -1;
}
// Save last endpoint URI
listener->get_address (_last_endpoint);
add_endpoint (_last_endpoint.c_str (), static_cast<own_t *> (listener),
NULL);
add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
static_cast<own_t *> (listener), NULL);
options.connected = true;
return 0;
}
......@@ -633,15 +638,16 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (listener);
event_bind_failed (address, zmq_errno ());
event_bind_failed (make_unconnected_bind_endpoint_pair (address),
zmq_errno ());
return -1;
}
// Save last endpoint URI
listener->get_address (_last_endpoint);
add_endpoint (_last_endpoint.c_str (), static_cast<own_t *> (listener),
NULL);
add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
static_cast<own_t *> (listener), NULL);
options.connected = true;
return 0;
}
......@@ -654,14 +660,17 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (listener);
event_bind_failed (address, zmq_errno ());
event_bind_failed (make_unconnected_bind_endpoint_pair (address),
zmq_errno ());
return -1;
}
// Save last endpoint URI
listener->get_address (_last_endpoint);
add_endpoint (endpoint_uri_, static_cast<own_t *> (listener), NULL);
// TODO shouldn't this use _last_endpoint as in the other cases?
add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
static_cast<own_t *> (listener), NULL);
options.connected = true;
return 0;
}
......@@ -970,48 +979,49 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_)
// Save last endpoint URI
paddr->to_string (_last_endpoint);
add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
add_endpoint (make_unconnected_connect_endpoint_pair (_last_endpoint),
static_cast<own_t *> (session), newpipe);
return 0;
}
std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_,
const char *tcp_address_)
std::string
zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
const char *tcp_address_)
{
// The resolved last_endpoint is used as a key in the endpoints map.
// The address passed by the user might not match in the TCP case due to
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
// resolve before giving up. Given at this stage we don't know whether a
// socket is connected or bound, try with both.
if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (tcp_addr);
int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
if (rc == 0) {
tcp_addr->to_string (endpoint_uri_);
if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
tcp_addr->to_string (endpoint_uri_pair_);
if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
if (rc == 0) {
tcp_addr->to_string (endpoint_uri_);
tcp_addr->to_string (endpoint_uri_pair_);
}
}
}
LIBZMQ_DELETE (tcp_addr);
}
return endpoint_uri_;
return endpoint_uri_pair_;
}
void zmq::socket_base_t::add_endpoint (const char *endpoint_uri_,
own_t *endpoint_,
pipe_t *pipe_)
void zmq::socket_base_t::add_endpoint (
const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
{
// Activate the session. Make it a child of this socket.
launch_child (endpoint_);
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_),
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
endpoint_pipe_t (endpoint_, pipe_));
if (pipe_ != NULL)
pipe_->set_endpoint_uri (endpoint_uri_);
pipe_->set_endpoint_pair (endpoint_pair_);
}
int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
......@@ -1059,7 +1069,7 @@ int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
: endpoint_uri_str;
// Find the endpoints range (if any) corresponding to the endpoint_uri_ string.
// Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
_endpoints.equal_range (resolved_endpoint_uri);
if (range.first == range.second) {
......@@ -1560,9 +1570,10 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
_pipes.erase (pipe_);
// Remove the pipe from _endpoints (set it to NULL).
if (!pipe_->get_endpoint_uri ().empty ()) {
const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
if (!identifier.empty ()) {
std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
range = _endpoints.equal_range (pipe_->get_endpoint_uri ());
range = _endpoints.equal_range (identifier);
for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
if (it->second.second == pipe_) {
......@@ -1586,7 +1597,9 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
_rcvmore = (msg_->flags () & msg_t::more) != 0;
}
int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
int zmq::socket_base_t::monitor (const char *endpoint_,
uint64_t events_,
int event_version_)
{
scoped_lock_t lock (_monitor_sync);
......@@ -1595,6 +1608,12 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
return -1;
}
// Event version 1 supports only first 16 events.
if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
errno = EINVAL;
return -1;
}
// Support deregistering monitoring endpoints as well
if (endpoint_ == NULL) {
stop_monitor ();
......@@ -1617,6 +1636,7 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
}
// Register events to monitor
_monitor_events = events_;
options.monitor_event_version = event_version_;
_monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
if (_monitor_socket == NULL)
return -1;
......@@ -1635,137 +1655,174 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
return rc;
}
void zmq::socket_base_t::event_connected (const std::string &endpoint_uri_,
zmq::fd_t fd_)
void zmq::socket_base_t::event_connected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_, fd_, ZMQ_EVENT_CONNECTED);
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CONNECTED);
}
void zmq::socket_base_t::event_connect_delayed (
const std::string &endpoint_uri_, int err_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_CONNECT_DELAYED);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CONNECT_DELAYED);
}
void zmq::socket_base_t::event_connect_retried (
const std::string &endpoint_uri_, int interval_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
{
event (endpoint_uri_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
event (endpoint_uri_pair_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
}
void zmq::socket_base_t::event_listening (const std::string &endpoint_uri_,
zmq::fd_t fd_)
void zmq::socket_base_t::event_listening (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_, fd_, ZMQ_EVENT_LISTENING);
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_LISTENING);
}
void zmq::socket_base_t::event_bind_failed (const std::string &endpoint_uri_,
int err_)
void zmq::socket_base_t::event_bind_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_BIND_FAILED);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_BIND_FAILED);
}
void zmq::socket_base_t::event_accepted (const std::string &endpoint_uri_,
zmq::fd_t fd_)
void zmq::socket_base_t::event_accepted (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_, fd_, ZMQ_EVENT_ACCEPTED);
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_ACCEPTED);
}
void zmq::socket_base_t::event_accept_failed (const std::string &endpoint_uri_,
int err_)
void zmq::socket_base_t::event_accept_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_ACCEPT_FAILED);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_ACCEPT_FAILED);
}
void zmq::socket_base_t::event_closed (const std::string &endpoint_uri_,
zmq::fd_t fd_)
void zmq::socket_base_t::event_closed (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_, fd_, ZMQ_EVENT_CLOSED);
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CLOSED);
}
void zmq::socket_base_t::event_close_failed (const std::string &endpoint_uri_,
int err_)
void zmq::socket_base_t::event_close_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_CLOSE_FAILED);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CLOSE_FAILED);
}
void zmq::socket_base_t::event_disconnected (const std::string &endpoint_uri_,
zmq::fd_t fd_)
void zmq::socket_base_t::event_disconnected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_, fd_, ZMQ_EVENT_DISCONNECTED);
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_DISCONNECTED);
}
void zmq::socket_base_t::event_handshake_failed_no_detail (
const std::string &endpoint_uri_, int err_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
}
void zmq::socket_base_t::event_handshake_failed_protocol (
const std::string &endpoint_uri_, int err_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
}
void zmq::socket_base_t::event_handshake_failed_auth (
const std::string &endpoint_uri_, int err_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
}
void zmq::socket_base_t::event_handshake_succeeded (
const std::string &endpoint_uri_, int err_)
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
}
void zmq::socket_base_t::event (const std::string &endpoint_uri_,
intptr_t value_,
int type_)
void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_,
uint64_t type_)
{
scoped_lock_t lock (_monitor_sync);
if (_monitor_events & type_) {
monitor_event (type_, value_, endpoint_uri_);
monitor_event (type_, value_, endpoint_uri_pair_);
}
}
// Send a monitor event
void zmq::socket_base_t::monitor_event (int event_,
intptr_t value_,
const std::string &endpoint_uri_) const
void zmq::socket_base_t::monitor_event (
uint64_t event_,
uint64_t value_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const
{
// this is a private method which is only called from
// contexts where the mutex has been locked before
// contexts where the _monitor_sync mutex has been locked before
if (_monitor_socket) {
// Send event in first frame
const uint16_t event = static_cast<uint16_t> (event_);
const uint32_t value = static_cast<uint32_t> (value_);
zmq_msg_t msg;
zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
// Avoid dereferencing uint32_t on unaligned address
memcpy (data + 0, &event, sizeof (event));
memcpy (data + sizeof (event), &value, sizeof (value));
zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE);
// Send address in second frame
zmq_msg_init_size (&msg, endpoint_uri_.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_.c_str (),
endpoint_uri_.size ());
zmq_sendmsg (_monitor_socket, &msg, 0);
switch (options.monitor_event_version) {
case 1: {
// The API should not allow to activate unsupported events
zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
zmq_assert (value_ <= std::numeric_limits<uint32_t>::max ());
// Send event and value in first frame
const uint16_t event = static_cast<uint16_t> (event_);
const uint32_t value = static_cast<uint32_t> (value_);
zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
// Avoid dereferencing uint32_t on unaligned address
memcpy (data + 0, &event, sizeof (event));
memcpy (data + sizeof (event), &value, sizeof (value));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
const std::string &endpoint_uri =
endpoint_uri_pair_.identifier ();
// Send address in second frame
zmq_msg_init_size (&msg, endpoint_uri.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
endpoint_uri.size ());
zmq_msg_send (&msg, _monitor_socket, 0);
} break;
case 2: {
// Send event in first frame (64bit unsigned)
zmq_msg_init_size (&msg, sizeof event_);
memcpy (zmq_msg_data (&msg), &event_, sizeof event_);
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send value in second frame (64bit unsigned)
zmq_msg_init_size (&msg, sizeof value_);
memcpy (zmq_msg_data (&msg), &value_, sizeof value_);
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send local endpoint URI in third frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
endpoint_uri_pair_.local.size ());
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send remote endpoint URI in fourth frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
endpoint_uri_pair_.remote.size ());
zmq_msg_send (&msg, _monitor_socket, 0);
} break;
}
}
}
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
{
// this is a private method which is only called from
// contexts where the mutex has been locked before
// contexts where the _monitor_sync mutex has been locked before
if (_monitor_socket) {
if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
&& send_monitor_stopped_event_)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0,
endpoint_uri_pair_t ());
zmq_close (_monitor_socket);
_monitor_socket = NULL;
_monitor_events = 0;
......
......@@ -43,6 +43,7 @@
#include "i_mailbox.hpp"
#include "clock.hpp"
#include "pipe.hpp"
#include "endpoint.hpp"
extern "C" {
void zmq_free_event (void *data_, void *hint_);
......@@ -118,26 +119,38 @@ class socket_base_t : public own_t,
void lock ();
void unlock ();
int monitor (const char *endpoint_, int events_);
int monitor (const char *endpoint_, uint64_t events_, int event_version_);
void event_connected (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_connect_delayed (const std::string &endpoint_uri_, int err_);
void event_connect_retried (const std::string &endpoint_uri_,
void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_,
int interval_);
void event_listening (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_bind_failed (const std::string &endpoint_uri_, int err_);
void event_accepted (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_accept_failed (const std::string &endpoint_uri_, int err_);
void event_closed (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_close_failed (const std::string &endpoint_uri_, int err_);
void event_disconnected (const std::string &endpoint_uri_, zmq::fd_t fd_);
void event_handshake_failed_no_detail (const std::string &endpoint_uri_,
int err_);
void event_handshake_failed_protocol (const std::string &endpoint_uri_,
int err_);
void event_handshake_failed_auth (const std::string &endpoint_uri_,
int err_);
void event_handshake_succeeded (const std::string &endpoint_uri_, int err_);
void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_,
zmq::fd_t fd_);
void event_handshake_failed_no_detail (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
void event_handshake_failed_protocol (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
void
event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
void
event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_,
int err_);
// Query the state of a specific peer. The default implementation
// always returns an ENOTSUP error.
......@@ -186,19 +199,22 @@ class socket_base_t : public own_t,
private:
// test if event should be sent and then dispatch it
void event (const std::string &endpoint_uri_, intptr_t value_, int type_);
void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_,
uint64_t type_);
// Socket event data dispatch
void monitor_event (int event_,
intptr_t value_,
const std::string &endpoint_uri_) const;
void monitor_event (uint64_t event_,
uint64_t value_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const;
// Monitor socket cleanup
void stop_monitor (bool send_monitor_stopped_event_ = true);
// Creates new endpoint ID and adds the endpoint to the map.
void
add_endpoint (const char *endpoint_uri_, own_t *endpoint_, pipe_t *pipe_);
void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_,
own_t *endpoint_,
pipe_t *pipe_);
// Map of open endpoints.
typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
......@@ -295,7 +311,7 @@ class socket_base_t : public own_t,
void *_monitor_socket;
// Bitmask of events being monitored
int _monitor_events;
int64_t _monitor_events;
// Last socket endpoint resolved URI
std::string _last_endpoint;
......
......@@ -150,15 +150,19 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1)
error ();
else {
// TODO query the remote endpoint and pass it here
endpoint_uri_pair_t endpoint_pair =
make_unconnected_connect_endpoint_pair (_endpoint);
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (_s, options, _endpoint);
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (_s, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
_socket->event_connected (_endpoint, _s);
_socket->event_connected (endpoint_pair, _s);
rm_fd (_handle);
_s = -1;
......@@ -225,7 +229,8 @@ void zmq::socks_connecter_t::initiate_connect ()
_handle = add_fd (_s);
set_pollout (_handle);
_status = waiting_for_proxy_connection;
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
}
// Handle any other error condition by eventual reconnect.
else {
......@@ -272,7 +277,8 @@ void zmq::socks_connecter_t::start_timer ()
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_status = waiting_for_reconnect_time;
_socket->event_connect_retried (_endpoint, interval);
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
}
}
......@@ -443,7 +449,8 @@ void zmq::socks_connecter_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
......
......@@ -94,7 +94,8 @@ void zmq::stream_connecter_base_t::add_reconnect_timer ()
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_socket->event_connect_retried (
make_unconnected_connect_endpoint_pair (_endpoint), interval);
_reconnect_timer_started = true;
}
}
......@@ -131,7 +132,8 @@ void zmq::stream_connecter_base_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
_s);
_s = retired_fd;
}
......@@ -145,9 +147,12 @@ void zmq::stream_connecter_base_t::in_event ()
void zmq::stream_connecter_base_t::create_engine (fd_t fd)
{
const endpoint_uri_pair_t endpoint_pair ("TODO query local endpoint",
_endpoint, endpoint_type_connect);
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
......@@ -156,7 +161,7 @@ void zmq::stream_connecter_base_t::create_engine (fd_t fd)
// Shut the connecter down.
terminate ();
_socket->event_connected (_endpoint, fd);
_socket->event_connected (endpoint_pair, fd);
}
void zmq::stream_connecter_base_t::timer_event (int id_)
......
......@@ -100,9 +100,10 @@ static std::string get_peer_address (zmq::fd_t s_)
}
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
const options_t &options_,
const std::string &endpoint_) :
zmq::stream_engine_t::stream_engine_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
_s (fd_),
_handle (static_cast<handle_t> (NULL)),
_inpos (NULL),
......@@ -117,7 +118,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
_greeting_bytes_read (0),
_session (NULL),
_options (options_),
_endpoint (endpoint_),
_endpoint_uri_pair (endpoint_uri_pair_),
_plugged (false),
_next_msg (&stream_engine_t::routing_id_msg),
_process_msg (&stream_engine_t::process_routing_id_msg),
......@@ -894,9 +895,9 @@ void zmq::stream_engine_t::zap_msg_available ()
restart_output ();
}
const char *zmq::stream_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const
{
return _endpoint.c_str ();
return _endpoint_uri_pair;
}
void zmq::stream_engine_t::mechanism_ready ()
......@@ -960,7 +961,7 @@ void zmq::stream_engine_t::mechanism_ready ()
alloc_assert (_metadata);
}
_socket->event_handshake_succeeded (_endpoint, 0);
_socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
}
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
......@@ -1081,10 +1082,10 @@ void zmq::stream_engine_t::error (error_reason_t reason_)
&& (_mechanism == NULL
|| _mechanism->status () == mechanism_t::handshaking)) {
int err = errno;
_socket->event_handshake_failed_no_detail (_endpoint, err);
_socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
}
_socket->event_disconnected (_endpoint, _s);
_socket->event_disconnected (_endpoint_uri_pair, _s);
_session->flush ();
_session->engine_error (reason_);
unplug ();
......
......@@ -70,7 +70,7 @@ class stream_engine_t : public io_object_t, public i_engine
stream_engine_t (fd_t fd_,
const options_t &options_,
const std::string &endpoint_);
const endpoint_uri_pair_t &endpoint_uri_pair_);
~stream_engine_t ();
// i_engine interface implementation.
......@@ -79,7 +79,7 @@ class stream_engine_t : public io_object_t, public i_engine
bool restart_input ();
void restart_output ();
void zap_msg_available ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
// i_poll_events interface implementation.
void in_event ();
......@@ -190,8 +190,8 @@ class stream_engine_t : public io_object_t, public i_engine
const options_t _options;
// String representation of endpoint
const std::string _endpoint;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
bool _plugged;
......
......@@ -96,7 +96,7 @@ int zmq::stream_listener_base_t::close ()
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_socket->event_closed (make_unconnected_bind_endpoint_pair (_endpoint), _s);
_s = retired_fd;
return 0;
......@@ -104,8 +104,11 @@ int zmq::stream_listener_base_t::close ()
void zmq::stream_listener_base_t::create_engine (fd_t fd)
{
const endpoint_uri_pair_t endpoint_pair (_endpoint, get_socket_name (fd),
endpoint_type_bind);
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
......@@ -120,5 +123,6 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd)
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
_socket->event_accepted (endpoint_pair, fd);
}
......@@ -140,7 +140,8 @@ void zmq::tcp_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// add userspace connect timeout
add_connect_timer ();
......
......@@ -72,7 +72,8 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -83,7 +84,8 @@ void zmq::tcp_listener_t::in_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -107,7 +109,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
if (options.use_fd != -1) {
_s = options.use_fd;
_socket->event_listening (_endpoint, _s);
_socket->event_listening (
make_unconnected_bind_endpoint_pair (_endpoint), _s);
return 0;
}
......@@ -194,7 +197,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error;
#endif
_socket->event_listening (_endpoint, _s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
......
......@@ -94,7 +94,8 @@ void zmq::tipc_connecter_t::start_connecting ()
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
}
// Handle any other error condition by eventual reconnect.
......
......@@ -68,7 +68,8 @@ void zmq::tipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (_endpoint, zmq_errno ());
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
......@@ -129,7 +130,8 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
if (rc != 0)
goto error;
_socket->event_listening (_endpoint, _s);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
error:
......
......@@ -444,9 +444,9 @@ void zmq::udp_engine_t::out_event ()
reset_pollout (_handle);
}
const char *zmq::udp_engine_t::get_endpoint () const
const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
{
return "";
return _empty_endpoint;
}
void zmq::udp_engine_t::restart_output ()
......
......@@ -43,12 +43,14 @@ class udp_engine_t : public io_object_t, public i_engine
void in_event ();
void out_event ();
const char *get_endpoint () const;
const endpoint_uri_pair_t &get_endpoint () const;
private:
int resolve_raw_address (char *addr_, size_t length_);
void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_);
const endpoint_uri_pair_t _empty_endpoint;
bool _plugged;
fd_t _fd;
......
......@@ -267,12 +267,20 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return s->getsockopt (option_, optval_, optvallen_);
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->monitor (addr_, events_);
return s->monitor (addr_, events_, event_version_);
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
return zmq_socket_monitor_versioned (s_, addr_, events_, 1);
}
int zmq_join (void *s_, const char *group_)
......
......@@ -123,6 +123,11 @@ int zmq_socket_get_peer_state (void *socket_,
const void *routing_id_,
size_t routing_id_size_);
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
#endif // ZMQ_BUILD_DRAFT_API
#endif //ifndef __ZMQ_DRAFT_H_INCLUDED__
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment