Commit cb737452 authored by Luca Boccassi's avatar Luca Boccassi Committed by Luca Boccassi

Problem: cannot send more than one value per v2 event

Solution: refactor code and add extra frame with value count before the
values in v2
parent 1e26a93c
...@@ -1658,101 +1658,117 @@ int zmq::socket_base_t::monitor (const char *endpoint_, ...@@ -1658,101 +1658,117 @@ int zmq::socket_base_t::monitor (const char *endpoint_,
void zmq::socket_base_t::event_connected ( void zmq::socket_base_t::event_connected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{ {
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CONNECTED); uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
} }
void zmq::socket_base_t::event_connect_delayed ( void zmq::socket_base_t::event_connect_delayed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CONNECT_DELAYED); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
} }
void zmq::socket_base_t::event_connect_retried ( void zmq::socket_base_t::event_connect_retried (
const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_) const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
{ {
event (endpoint_uri_pair_, interval_, ZMQ_EVENT_CONNECT_RETRIED); uint64_t values[1] = {(uint64_t) interval_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
} }
void zmq::socket_base_t::event_listening ( void zmq::socket_base_t::event_listening (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{ {
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_LISTENING); uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
} }
void zmq::socket_base_t::event_bind_failed ( void zmq::socket_base_t::event_bind_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_BIND_FAILED); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
} }
void zmq::socket_base_t::event_accepted ( void zmq::socket_base_t::event_accepted (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{ {
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_ACCEPTED); uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
} }
void zmq::socket_base_t::event_accept_failed ( void zmq::socket_base_t::event_accept_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_ACCEPT_FAILED); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
} }
void zmq::socket_base_t::event_closed ( void zmq::socket_base_t::event_closed (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{ {
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CLOSED); uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
} }
void zmq::socket_base_t::event_close_failed ( void zmq::socket_base_t::event_close_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CLOSE_FAILED); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
} }
void zmq::socket_base_t::event_disconnected ( void zmq::socket_base_t::event_disconnected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{ {
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_DISCONNECTED); uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
} }
void zmq::socket_base_t::event_handshake_failed_no_detail ( void zmq::socket_base_t::event_handshake_failed_no_detail (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
} }
void zmq::socket_base_t::event_handshake_failed_protocol ( void zmq::socket_base_t::event_handshake_failed_protocol (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
} }
void zmq::socket_base_t::event_handshake_failed_auth ( void zmq::socket_base_t::event_handshake_failed_auth (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
} }
void zmq::socket_base_t::event_handshake_succeeded ( void zmq::socket_base_t::event_handshake_succeeded (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{ {
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
} }
void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_, void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_, uint64_t values_[],
uint64_t values_count_,
uint64_t type_) uint64_t type_)
{ {
scoped_lock_t lock (_monitor_sync); scoped_lock_t lock (_monitor_sync);
if (_monitor_events & type_) { if (_monitor_events & type_) {
monitor_event (type_, value_, endpoint_uri_pair_); monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
} }
} }
// Send a monitor event // Send a monitor event
void zmq::socket_base_t::monitor_event ( void zmq::socket_base_t::monitor_event (
uint64_t event_, uint64_t event_,
uint64_t value_, uint64_t values_[],
uint64_t values_count_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const const endpoint_uri_pair_t &endpoint_uri_pair_) const
{ {
// this is a private method which is only called from // this is a private method which is only called from
...@@ -1765,11 +1781,14 @@ void zmq::socket_base_t::monitor_event ( ...@@ -1765,11 +1781,14 @@ void zmq::socket_base_t::monitor_event (
case 1: { case 1: {
// The API should not allow to activate unsupported events // The API should not allow to activate unsupported events
zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ()); zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
zmq_assert (value_ <= std::numeric_limits<uint32_t>::max ()); // v1 only allows one value
zmq_assert (values_count_ == 1);
zmq_assert (values_[0]
<= std::numeric_limits<uint32_t>::max ());
// Send event and value in first frame // Send event and value in first frame
const uint16_t event = static_cast<uint16_t> (event_); const uint16_t event = static_cast<uint16_t> (event_);
const uint32_t value = static_cast<uint32_t> (value_); const uint32_t value = static_cast<uint32_t> (values_[0]);
zmq_msg_init_size (&msg, sizeof (event) + sizeof (value)); zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg)); uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
// Avoid dereferencing uint32_t on unaligned address // Avoid dereferencing uint32_t on unaligned address
...@@ -1788,22 +1807,31 @@ void zmq::socket_base_t::monitor_event ( ...@@ -1788,22 +1807,31 @@ void zmq::socket_base_t::monitor_event (
} break; } break;
case 2: { case 2: {
// Send event in first frame (64bit unsigned) // Send event in first frame (64bit unsigned)
zmq_msg_init_size (&msg, sizeof event_); zmq_msg_init_size (&msg, sizeof (event_));
memcpy (zmq_msg_data (&msg), &event_, sizeof event_); memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send value in second frame (64bit unsigned) // Send number of values that will follow in second frame
zmq_msg_init_size (&msg, sizeof value_); zmq_msg_init_size (&msg, sizeof (values_count_));
memcpy (zmq_msg_data (&msg), &value_, sizeof value_); memcpy (zmq_msg_data (&msg), &values_count_,
sizeof (values_count_));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send local endpoint URI in third frame (string) // Send values in third-Nth frames (64bit unsigned)
for (uint64_t i = 0; i < values_count_; ++i) {
zmq_msg_init_size (&msg, sizeof (values_[i]));
memcpy (zmq_msg_data (&msg), &values_[i],
sizeof (values_[i]));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
}
// Send local endpoint URI in second-to-last frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ()); zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (), memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
endpoint_uri_pair_.local.size ()); endpoint_uri_pair_.local.size ());
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send remote endpoint URI in fourth frame (string) // Send remote endpoint URI in last frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ()); zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (), memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
endpoint_uri_pair_.remote.size ()); endpoint_uri_pair_.remote.size ());
...@@ -1820,9 +1848,11 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) ...@@ -1820,9 +1848,11 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
if (_monitor_socket) { if (_monitor_socket) {
if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED) if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
&& send_monitor_stopped_event_) && send_monitor_stopped_event_) {
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, uint64_t values[1] = {0};
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
endpoint_uri_pair_t ()); endpoint_uri_pair_t ());
}
zmq_close (_monitor_socket); zmq_close (_monitor_socket);
_monitor_socket = NULL; _monitor_socket = NULL;
_monitor_events = 0; _monitor_events = 0;
......
...@@ -200,12 +200,14 @@ class socket_base_t : public own_t, ...@@ -200,12 +200,14 @@ class socket_base_t : public own_t,
private: private:
// test if event should be sent and then dispatch it // test if event should be sent and then dispatch it
void event (const endpoint_uri_pair_t &endpoint_uri_pair_, void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_, uint64_t values_[],
uint64_t values_count_,
uint64_t type_); uint64_t type_);
// Socket event data dispatch // Socket event data dispatch
void monitor_event (uint64_t event_, void monitor_event (uint64_t event_,
uint64_t value_, uint64_t values_[],
uint64_t values_count_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const; const endpoint_uri_pair_t &endpoint_uri_pair_) const;
// Monitor socket cleanup // Monitor socket cleanup
......
...@@ -210,10 +210,10 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, ...@@ -210,10 +210,10 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
assert (sizeof (uint64_t) == zmq_msg_size (&msg)); assert (sizeof (uint64_t) == zmq_msg_size (&msg));
uint64_t event; uint64_t event;
memcpy (&event, zmq_msg_data (&msg), sizeof event); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
zmq_msg_close (&msg); zmq_msg_close (&msg);
// Second frame in message contains event value // Second frame in message contains the number of values
zmq_msg_init (&msg); zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
assert (errno == EAGAIN); assert (errno == EAGAIN);
...@@ -222,10 +222,25 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, ...@@ -222,10 +222,25 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
assert (zmq_msg_more (&msg)); assert (zmq_msg_more (&msg));
assert (sizeof (uint64_t) == zmq_msg_size (&msg)); assert (sizeof (uint64_t) == zmq_msg_size (&msg));
if (value_) uint64_t value_count;
memcpy (value_, zmq_msg_data (&msg), sizeof *value_); memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
zmq_msg_close (&msg); zmq_msg_close (&msg);
for (uint64_t i = 0; i < value_count; ++i) {
// Subsequent frames in message contain event values
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
assert (errno == EAGAIN);
return -1; // timed out or no message available
}
assert (zmq_msg_more (&msg));
assert (sizeof (uint64_t) == zmq_msg_size (&msg));
if (value_ && value_ + i)
memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
zmq_msg_close (&msg);
}
// Third frame in message contains local address // Third frame in message contains local address
zmq_msg_init (&msg); zmq_msg_init (&msg);
int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment