Commit feadf6d4 authored by Luca Boccassi's avatar Luca Boccassi Committed by Luca Boccassi

Problem: cannot monitor state of queues at runtime

Solution: add API and ZMQ_EVENT_PIPES_STATS event which generates 2
values, one for the egress and one for the ingress pipes respectively.
Refactor the events code to be able to send multiple values.
parent cb737452
...@@ -12,6 +12,8 @@ SYNOPSIS ...@@ -12,6 +12,8 @@ SYNOPSIS
-------- --------
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* *int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
*int zmq_socket_monitor_pipes_stats (void '*socket');*
DESCRIPTION DESCRIPTION
----------- -----------
...@@ -41,18 +43,23 @@ Each event is sent in multiple frames. The first frame contains an event ...@@ -41,18 +43,23 @@ Each event is sent in multiple frames. The first frame contains an event
number (64 bits). The number and content of further frames depend on this number (64 bits). The number and content of further frames depend on this
event number. event number.
Unless it is specified differently, the second frame contains an event Unless it is specified differently, the second frame contains the number of
value (64 bits) that provides additional data according to the event number. value frames that will follow it as a 64 bits integer. The third frame to N-th
Some events might define additional value frames following the second one. frames contain an event value (64 bits) that provides additional data according
The third and fourth frames contain strings that specifies the affected to the event number. Each event type might have a different number of values.
connection or endpoint. The third frame contains a string denoting the local The second-to-last and last frames contain strings that specifies the affected
endpoint, while the fourth frame contains a string denoting the remote endpoint. connection or endpoint. The former frame contains a string denoting the local
endpoint, while the latter frame contains a string denoting the remote endpoint.
Either of these may be empty, depending on the event type and whether the Either of these may be empty, depending on the event type and whether the
connection uses a bound or connected local endpoint. connection uses a bound or connected local endpoint.
Note that the format of the second and further frames, and also the number of Note that the format of the second and further frames, and also the number of
frames, may be different for events added in the future. frames, may be different for events added in the future.
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
---- ----
Monitoring events are only generated by some transports: At the moment these Monitoring events are only generated by some transports: At the moment these
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
...@@ -168,17 +175,35 @@ The ZMTP security mechanism handshake failed due to an authentication failure. ...@@ -168,17 +175,35 @@ The ZMTP security mechanism handshake failed due to an authentication failure.
The event value is the status code returned by the ZAP handler (i.e. 300, The event value is the status code returned by the ZAP handler (i.e. 300,
400 or 500). 400 or 500).
----
Supported events (v2)
----------------
ZMQ_EVENT_PIPE_STATS
~~~~~~~~~~~~~~~~~~~~
This event provides two values, the number of messages in each of the two
queues associated with the returned endpoint (respectively egress and ingress).
This event only triggers after calling the function
_zmq_socket_monitor_pipes_stats()_.
NOTE: this measurement is asynchronous, so by the time the message is received
the internal state might have already changed.
NOTE: when the monitored socket and the monitor are not used in a poll, the
event might not be delivered until an API has been called on the monitored
socket, like zmq_getsockopt for example (the option is irrelevant).
NOTE: in DRAFT state, not yet available in stable releases.
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_socket_monitor()_ function returns a value of 0 or greater if The _zmq_socket_monitor()_ and _zmq_socket_monitor_pipes_stats()_ functions
successful. Otherwise it returns `-1` and sets 'errno' to one of the values return a value of 0 or greater if successful. Otherwise they return `-1` and
defined below. set 'errno' to one of the values defined below.
ERRORS ERRORS - _zmq_socket_monitor()_
------ -------------------------------
*ETERM*:: *ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
...@@ -189,11 +214,23 @@ sockets are required to use the inproc:// transport. ...@@ -189,11 +214,23 @@ sockets are required to use the inproc:// transport.
*EINVAL*:: *EINVAL*::
The monitor 'endpoint' supplied does not exist. The monitor 'endpoint' supplied does not exist.
ERRORS - _zmq_socket_monitor_pipes_stats()_
-------------------------------------------
*ENOTSOCK*::
The 'socket' parameter was not a valid 0MQ socket.
*EINVAL*::
The socket did not have monitoring enabled.
*EAGAIN*::
The monitored socket did not have any connections to monitor yet.
EXAMPLE EXAMPLE
------- -------
.Monitoring client and server sockets .Monitoring client and server sockets
---- ----
// Read one event off the monitor socket; return value and address // Read one event off the monitor socket; return values and addresses
// by reference, if not null, and event number by value. Returns -1 // by reference, if not null, and event number by value. Returns -1
// in case of error. // in case of error.
...@@ -211,18 +248,29 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** ...@@ -211,18 +248,29 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char **
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
zmq_msg_close (&msg); zmq_msg_close (&msg);
// Second frame to Nth frame (depends on the event) in message // Second frame in message contains the number of values
// contains event value zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
uint64_t value_count;
memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
zmq_msg_close (&msg);
for (uint64_t i = 0; i < value_count; ++i) {
// Subsequent frames in message contain event values
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1) if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg)); assert (zmq_msg_more (&msg));
if (value_) if (value_ && value_ + i)
memcpy (value_, zmq_msg_data (&msg), sizeof *value_); memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
zmq_msg_close (&msg); zmq_msg_close (&msg);
}
// Third frame in message contains local address // Second-to-last frame in message contains local address
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1) if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably return -1; // Interrupted, presumably
...@@ -237,7 +285,7 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** ...@@ -237,7 +285,7 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char **
} }
zmq_msg_close (&msg); zmq_msg_close (&msg);
// Fourth frame in message contains remote address // Last frame in message contains remote address
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1) if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably return -1; // Interrupted, presumably
......
...@@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, ...@@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
const void *routing_id, const void *routing_id,
size_t routing_id_size); size_t routing_id_size);
/* DRAFT Socket monitoring events */
#define ZMQ_EVENT_PIPES_STATS 0x10000
#define ZMQ_CURRENT_EVENT_VERSION 1 #define ZMQ_CURRENT_EVENT_VERSION 1
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2 #define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL #define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 #define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_, const char *addr_,
uint64_t events_, uint64_t events_,
int event_version_); int event_version_);
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
#endif // ZMQ_BUILD_DRAFT_API #endif // ZMQ_BUILD_DRAFT_API
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <string> #include <string>
#include "stdint.hpp" #include "stdint.hpp"
#include "endpoint.hpp"
namespace zmq namespace zmq
{ {
...@@ -73,6 +74,8 @@ __declspec(align (64)) ...@@ -73,6 +74,8 @@ __declspec(align (64))
reap, reap,
reaped, reaped,
inproc_connected, inproc_connected,
pipe_peer_stats,
pipe_stats_publish,
done done
} type; } type;
...@@ -186,6 +189,23 @@ __declspec(align (64)) ...@@ -186,6 +189,23 @@ __declspec(align (64))
{ {
} reaped; } reaped;
// Send application-side pipe count and ask to send monitor event
struct
{
uint64_t queue_count;
zmq::own_t *socket_base;
endpoint_uri_pair_t *endpoint_pair;
} pipe_peer_stats;
// Collate application thread and I/O thread pipe counts and endpoints
// and send as event
struct
{
uint64_t outbound_queue_count;
uint64_t inbound_queue_count;
endpoint_uri_pair_t *endpoint_pair;
} pipe_stats_publish;
// Sent by reaper thread to the term thread when all the sockets // Sent by reaper thread to the term thread when all the sockets
// are successfully deallocated. // are successfully deallocated.
struct struct
......
...@@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_)
process_hiccup (cmd_.args.hiccup.pipe); process_hiccup (cmd_.args.hiccup.pipe);
break; break;
case command_t::pipe_peer_stats:
process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
cmd_.args.pipe_peer_stats.socket_base,
cmd_.args.pipe_peer_stats.endpoint_pair);
break;
case command_t::pipe_stats_publish:
process_pipe_stats_publish (
cmd_.args.pipe_stats_publish.outbound_queue_count,
cmd_.args.pipe_stats_publish.inbound_queue_count,
cmd_.args.pipe_stats_publish.endpoint_pair);
break;
case command_t::pipe_term: case command_t::pipe_term:
process_pipe_term (); process_pipe_term ();
break; break;
...@@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) ...@@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_peer_stats;
cmd.args.pipe_peer_stats.queue_count = queue_count_;
cmd.args.pipe_peer_stats.socket_base = socket_base_;
cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
send_command (cmd);
}
void zmq::object_t::send_pipe_stats_publish (
own_t *destination_,
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_stats_publish;
cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
send_command (cmd);
}
void zmq::object_t::send_pipe_term (pipe_t *destination_) void zmq::object_t::send_pipe_term (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
...@@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *) ...@@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_pipe_peer_stats (uint64_t,
own_t *,
endpoint_uri_pair_t *)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_stats_publish (uint64_t,
uint64_t,
endpoint_uri_pair_t *)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_term () void zmq::object_t::process_pipe_term ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <string> #include <string>
#include "stdint.hpp" #include "stdint.hpp"
#include "endpoint.hpp"
namespace zmq namespace zmq
{ {
...@@ -96,6 +97,14 @@ class object_t ...@@ -96,6 +97,14 @@ class object_t
void send_activate_read (zmq::pipe_t *destination_); void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_);
void send_hiccup (zmq::pipe_t *destination_, void *pipe_); void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
void send_pipe_peer_stats (zmq::pipe_t *destination_,
uint64_t queue_count_,
zmq::own_t *socket_base,
endpoint_uri_pair_t *endpoint_pair_);
void send_pipe_stats_publish (zmq::own_t *destination_,
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
void send_pipe_term (zmq::pipe_t *destination_); void send_pipe_term (zmq::pipe_t *destination_);
void send_pipe_term_ack (zmq::pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_);
void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_); void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
...@@ -117,6 +126,13 @@ class object_t ...@@ -117,6 +126,13 @@ class object_t
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);
virtual void process_pipe_peer_stats (uint64_t queue_count_,
zmq::own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_);
virtual void
process_pipe_stats_publish (uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_pipe_hwm (int inhwm_, int outhwm_); virtual void process_pipe_hwm (int inhwm_, int outhwm_);
......
...@@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const ...@@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
{ {
return _endpoint_pair; return _endpoint_pair;
} }
void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
{
endpoint_uri_pair_t *ep =
new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
ep);
}
void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_)
{
send_pipe_stats_publish (socket_base_, queue_count_,
_msgs_written - _peers_msgs_read, endpoint_pair_);
}
...@@ -145,6 +145,8 @@ class pipe_t : public object_t, ...@@ -145,6 +145,8 @@ class pipe_t : public object_t,
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_); void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
const endpoint_uri_pair_t &get_endpoint_pair () const; const endpoint_uri_pair_t &get_endpoint_pair () const;
void send_stats_to_peer (own_t *socket_base_);
private: private:
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t; typedef ypipe_base_t<msg_t> upipe_t;
...@@ -153,6 +155,9 @@ class pipe_t : public object_t, ...@@ -153,6 +155,9 @@ class pipe_t : public object_t,
void process_activate_read (); void process_activate_read ();
void process_activate_write (uint64_t msgs_read_); void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_); void process_hiccup (void *pipe_);
void process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_);
void process_pipe_term (); void process_pipe_term ();
void process_pipe_term_ack (); void process_pipe_term_ack ();
void process_pipe_hwm (int inhwm_, int outhwm_); void process_pipe_hwm (int inhwm_, int outhwm_);
......
...@@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (!_pipe); zmq_assert (!_pipe);
_pipe = pipes[0]; _pipe = pipes[0];
// The endpoints strings are not set on bind, set them here so that
// events can use them.
pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
pipes[1]->set_endpoint_pair (engine_->get_endpoint ());
// Ask socket to plug into the remote end of the pipe. // Ask socket to plug into the remote end of the pipe.
send_bind (_socket, pipes[1]); send_bind (_socket, pipes[1]);
} }
......
...@@ -1421,6 +1421,45 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_) ...@@ -1421,6 +1421,45 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
delete endpoint_; delete endpoint_;
} }
void zmq::socket_base_t::process_pipe_stats_publish (
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_)
{
uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
delete endpoint_pair_;
}
/*
* There are 2 pipes per connection, and the inbound one _must_ be queried from
* the I/O thread. So ask the outbound pipe, in the application thread, to send
* a message (pipe_peer_stats) to its peer. The message will carry the outbound
* pipe stats and endpoint, and the reference to the socket object.
* The inbound pipe on the I/O thread will then add its own stats and endpoint,
* and write back a message to the socket object (pipe_stats_publish) which
* will raise an event with the data.
*/
int zmq::socket_base_t::query_pipes_stats ()
{
{
scoped_lock_t lock (_monitor_sync);
if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
errno = EINVAL;
return -1;
}
}
if (_pipes.size () == 0) {
errno = EAGAIN;
return -1;
}
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
_pipes[i]->send_stats_to_peer (this);
}
return 0;
}
void zmq::socket_base_t::update_pipe_options (int option_) void zmq::socket_base_t::update_pipe_options (int option_)
{ {
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) { if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
......
...@@ -157,6 +157,11 @@ class socket_base_t : public own_t, ...@@ -157,6 +157,11 @@ class socket_base_t : public own_t,
virtual int get_peer_state (const void *routing_id_, virtual int get_peer_state (const void *routing_id_,
size_t routing_id_size_) const; size_t routing_id_size_) const;
// Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS
// after gathering the data asynchronously. Requires event monitoring to
// be enabled.
int query_pipes_stats ();
protected: protected:
socket_base_t (zmq::ctx_t *parent_, socket_base_t (zmq::ctx_t *parent_,
uint32_t tid_, uint32_t tid_,
...@@ -278,6 +283,9 @@ class socket_base_t : public own_t, ...@@ -278,6 +283,9 @@ class socket_base_t : public own_t,
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_pipe_stats_publish (uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
void process_term (int linger_); void process_term (int linger_);
void process_term_endpoint (std::string *endpoint_); void process_term_endpoint (std::string *endpoint_);
......
...@@ -1452,3 +1452,11 @@ int zmq_has (const char *capability_) ...@@ -1452,3 +1452,11 @@ int zmq_has (const char *capability_)
// Whatever the application asked for, we don't have // Whatever the application asked for, we don't have
return false; return false;
} }
int zmq_socket_monitor_pipes_stats (void *s_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->query_pipes_stats ();
}
...@@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_, ...@@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_,
const void *routing_id_, const void *routing_id_,
size_t routing_id_size_); size_t routing_id_size_);
/* DRAFT Socket monitoring events */
#define ZMQ_EVENT_PIPES_STATS 0x10000
#define ZMQ_CURRENT_EVENT_VERSION 1
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
int zmq_socket_monitor_versioned (void *s_, int zmq_socket_monitor_versioned (void *s_,
const char *addr_, const char *addr_,
uint64_t events_, uint64_t events_,
int event_version_); int event_version_);
int zmq_socket_monitor_pipes_stats (void *s_);
#endif // ZMQ_BUILD_DRAFT_API #endif // ZMQ_BUILD_DRAFT_API
......
...@@ -50,6 +50,13 @@ void test_monitor_invalid_protocol_fails () ...@@ -50,6 +50,13 @@ void test_monitor_invalid_protocol_fails ()
TEST_ASSERT_FAILURE_ERRNO ( TEST_ASSERT_FAILURE_ERRNO (
EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0)); EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
#ifdef ZMQ_EVENT_PIPES_STATS
// Stats command needs to be called on a valid socket with monitoring
// enabled
TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL));
TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client));
#endif
test_context_socket_close_zero_linger (client); test_context_socket_close_zero_linger (client);
} }
...@@ -121,8 +128,9 @@ void test_monitor_basic () ...@@ -121,8 +128,9 @@ void test_monitor_basic ()
test_context_socket_close_zero_linger (server_mon); test_context_socket_close_zero_linger (server_mon);
} }
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
(defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) || (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
void test_monitor_versioned_basic (bind_function_t bind_function_, void test_monitor_versioned_basic (bind_function_t bind_function_,
const char *expected_prefix_) const char *expected_prefix_)
{ {
...@@ -242,6 +250,133 @@ void test_monitor_versioned_basic_tipc () ...@@ -242,6 +250,133 @@ void test_monitor_versioned_basic_tipc ()
static const char prefix[] = "tipc://"; static const char prefix[] = "tipc://";
test_monitor_versioned_basic (bind_loopback_tipc, prefix); test_monitor_versioned_basic (bind_loopback_tipc, prefix);
} }
#ifdef ZMQ_EVENT_PIPES_STATS
void test_monitor_versioned_stats (bind_function_t bind_function_,
const char *expected_prefix_)
{
char server_endpoint[MAX_SOCKET_STRING];
const int pulls_count = 4;
void *pulls[pulls_count];
// We'll monitor these two sockets
void *push = test_context_socket (ZMQ_PUSH);
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2));
// Should fail if there are no pipes to monitor
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push));
void *push_mon = test_context_socket (ZMQ_PAIR);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push"));
// Set lower HWM - queues will be filled so we should see it in the stats
int send_hwm = 500;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
// Set very low TCP buffers so that messages cannot be stored in-flight
const int tcp_buffer_size = 4096;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
bind_function_ (push, server_endpoint, sizeof (server_endpoint));
int ipv6_;
size_t ipv6_size_ = sizeof (ipv6_);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (push, ZMQ_IPV6, &ipv6_, &ipv6_size_));
for (int i = 0; i < pulls_count; ++i) {
pulls[i] = test_context_socket (ZMQ_PULL);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6_, sizeof (int)));
int timeout_ms = 10;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint));
}
// Send until we block
int send_count = 0;
// Saturate the TCP buffers too
char data[tcp_buffer_size * 2];
memset (data, 0, sizeof (data));
// Saturate all pipes - send + receive - on all connections
while (send_count < send_hwm * 2 * pulls_count) {
TEST_ASSERT_EQUAL_INT (sizeof (data),
zmq_send (push, data, sizeof (data), 0));
++send_count;
}
// Drain one of the pulls - doesn't matter how many messages, at least one
send_count = send_count / 4;
do {
zmq_recv (pulls[0], data, sizeof (data), 0);
--send_count;
} while (send_count > 0);
// To kick the application thread, do a dummy getsockopt - users here
// should use the monitor and the other sockets in a poll.
unsigned long int dummy;
size_t dummy_size = sizeof (dummy);
msleep (SETTLE_TIME);
// Note that the pipe stats on the sender will not get updated until the
// receiver has processed at least lwm ((hwm + 1) / 2) messages AND until
// the application thread has ran through the mailbox, as the update is
// delivered via a message (send_activate_write)
zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
// Ask for stats and check that they match
zmq_socket_monitor_pipes_stats (push);
msleep (SETTLE_TIME);
zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
for (int i = 0; i < pulls_count; ++i) {
char *push_local_address = NULL;
char *push_remote_address = NULL;
uint64_t queue_stat[2];
int64_t event = get_monitor_event_v2 (
push_mon, queue_stat, &push_local_address, &push_remote_address);
TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address);
TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address,
strlen (expected_prefix_));
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event);
TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]);
TEST_ASSERT_EQUAL_INT (0, queue_stat[1]);
free (push_local_address);
free (push_remote_address);
}
// Close client and server
test_context_socket_close_zero_linger (push_mon);
test_context_socket_close_zero_linger (push);
for (int i = 0; i < pulls_count; ++i)
test_context_socket_close_zero_linger (pulls[i]);
}
void test_monitor_versioned_stats_tcp_ipv4 ()
{
static const char prefix[] = "tcp://127.0.0.1:";
test_monitor_versioned_stats (bind_loopback_ipv4, prefix);
}
void test_monitor_versioned_stats_tcp_ipv6 ()
{
static const char prefix[] = "tcp://[::1]:";
test_monitor_versioned_stats (bind_loopback_ipv6, prefix);
}
void test_monitor_versioned_stats_ipc ()
{
static const char prefix[] = "ipc://";
test_monitor_versioned_stats (bind_loopback_ipc, prefix);
}
#endif // ZMQ_EVENT_PIPES_STATS
#endif #endif
int main () int main ()
...@@ -252,12 +387,18 @@ int main () ...@@ -252,12 +387,18 @@ int main ()
RUN_TEST (test_monitor_invalid_protocol_fails); RUN_TEST (test_monitor_invalid_protocol_fails);
RUN_TEST (test_monitor_basic); RUN_TEST (test_monitor_basic);
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
(defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) || (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
RUN_TEST (test_monitor_versioned_basic_ipc); RUN_TEST (test_monitor_versioned_basic_ipc);
RUN_TEST (test_monitor_versioned_basic_tipc); RUN_TEST (test_monitor_versioned_basic_tipc);
#ifdef ZMQ_EVENT_PIPES_STATS
RUN_TEST (test_monitor_versioned_stats_tcp_ipv4);
RUN_TEST (test_monitor_versioned_stats_tcp_ipv6);
RUN_TEST (test_monitor_versioned_stats_ipc);
#endif
#endif #endif
return UNITY_END (); return UNITY_END ();
......
...@@ -241,7 +241,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, ...@@ -241,7 +241,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
zmq_msg_close (&msg); zmq_msg_close (&msg);
} }
// Third frame in message contains local address // Second-to-last frame in message contains local address
zmq_msg_init (&msg); zmq_msg_init (&msg);
int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
assert (res != -1); assert (res != -1);
...@@ -256,7 +256,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, ...@@ -256,7 +256,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
} }
zmq_msg_close (&msg); zmq_msg_close (&msg);
// Fourth and last frame in message contains remote address // Last frame in message contains remote address
zmq_msg_init (&msg); zmq_msg_init (&msg);
res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
assert (res != -1); assert (res != -1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment