Commit 2cdad3d0 authored by Szekely Gyorgy's avatar Szekely Gyorgy Committed by Luca Boccassi

Add ZMQ_ROUTER_NOTIFY draft socket option (#3213)

* Add ZMQ_ROUTER_NOTIFY draft socket option
parent 4738bed5
...@@ -888,7 +888,8 @@ test_apps += tests/test_poller \ ...@@ -888,7 +888,8 @@ test_apps += tests/test_poller \
tests/test_radio_dish \ tests/test_radio_dish \
tests/test_scatter_gather \ tests/test_scatter_gather \
tests/test_dgram \ tests/test_dgram \
tests/test_app_meta tests/test_app_meta \
tests/test_router_notify
tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_poller_LDADD = src/libzmq.la ${UNITY_LIBS}
...@@ -918,6 +919,10 @@ tests_test_dgram_LDADD = src/libzmq.la ...@@ -918,6 +919,10 @@ tests_test_dgram_LDADD = src/libzmq.la
tests_test_app_meta_SOURCES = tests/test_app_meta.cpp tests_test_app_meta_SOURCES = tests/test_app_meta.cpp
tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS} tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_router_notify_SOURCES = tests/test_router_notify.cpp
tests_test_router_notify_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_router_notify_CPPFLAGS = ${UNITY_CPPFLAGS}
endif endif
if ENABLE_STATIC if ENABLE_STATIC
......
...@@ -904,6 +904,23 @@ Option value unit:: 0, 1 ...@@ -904,6 +904,23 @@ Option value unit:: 0, 1
Default value:: 1 Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
ZMQ_ROUTER_NOTIFY: Retrieve router socket notification settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the current notification settings of a router socket. The returned
value is a bitmask composed of ZMQ_NOTIFY_CONNECT and ZMQ_NOTIFY_DISCONNECT
flags, meaning connect and disconnect notifications are enabled, respectively.
A value of '0' means the notifications are off.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: 0, ZMQ_NOTIFY_CONNECT, ZMQ_NOTIFY_DISCONNECT, ZMQ_NOTIFY_CONNECT|ZMQ_NOTIFY_DISCONNECT
Default value:: 0
Applicable socket types:: ZMQ_ROUTER
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -1289,6 +1289,24 @@ Option value unit:: 0, 1 ...@@ -1289,6 +1289,24 @@ Option value unit:: 0, 1
Default value:: 1 Default value:: 1
Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport
ZMQ_ROUTER_NOTIFY: Send connect and disconnect notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Enable connect and disconnect notifications on a ROUTER socket.
When enabled, the socket delivers a zero-length message (with routing-id
as first frame) when a peer connects or disconnects. It's possible
to notify both events for a peer by OR-ing the flag values. This option
only applies to stream oriented (tcp, ipc) transports.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: 0, ZMQ_NOTIFY_CONNECT, ZMQ_NOTIFY_DISCONNECT, ZMQ_NOTIFY_CONNECT|ZMQ_NOTIFY_DISCONNECT
Default value:: 0
Applicable socket types:: ZMQ_ROUTER
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -618,6 +618,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); ...@@ -618,6 +618,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96 #define ZMQ_MULTICAST_LOOP 96
#define ZMQ_ROUTER_NOTIFY 97
/* DRAFT 0MQ socket events and monitoring */ /* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */ /* Unspecified system errors during handshake. Event value is an errno. */
...@@ -678,6 +680,10 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); ...@@ -678,6 +680,10 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id" #define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address" #define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
/* Router notify options */
#define ZMQ_NOTIFY_CONNECT 1
#define ZMQ_NOTIFY_DISCONNECT 2
/******************************************************************************/ /******************************************************************************/
/* Poller polling on sockets,fd and thread-safe sockets */ /* Poller polling on sockets,fd and thread-safe sockets */
/******************************************************************************/ /******************************************************************************/
......
...@@ -243,7 +243,8 @@ zmq::options_t::options_t () : ...@@ -243,7 +243,8 @@ zmq::options_t::options_t () :
zap_enforce_domain (false), zap_enforce_domain (false),
loopback_fastpath (false), loopback_fastpath (false),
multicast_loop (true), multicast_loop (true),
zero_copy (true) zero_copy (true),
router_notify (0)
{ {
memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE);
...@@ -1149,6 +1150,16 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1149,6 +1150,16 @@ int zmq::options_t::getsockopt (int option_,
} }
break; break;
#ifdef ZMQ_BUILD_DRAFT_API
case ZMQ_ROUTER_NOTIFY:
if (is_int) {
*value = router_notify;
return 0;
}
break;
#endif
default: default:
#if defined(ZMQ_ACT_MILITANT) #if defined(ZMQ_ACT_MILITANT)
malformed = false; malformed = false;
......
...@@ -261,6 +261,9 @@ struct options_t ...@@ -261,6 +261,9 @@ struct options_t
// Use zero copy strategy for storing message content when decoding. // Use zero copy strategy for storing message content when decoding.
bool zero_copy; bool zero_copy;
// Router socket ZMQ_NOTIFY_CONNECT/ZMQ_NOTIFY_DISCONNECT notifications
int router_notify;
// Application metadata // Application metadata
std::map<std::string, std::string> app_metadata; std::map<std::string, std::string> app_metadata;
}; };
......
...@@ -139,6 +139,16 @@ int zmq::router_t::xsetsockopt (int option_, ...@@ -139,6 +139,16 @@ int zmq::router_t::xsetsockopt (int option_,
} }
break; break;
#ifdef ZMQ_BUILD_DRAFT_API
case ZMQ_ROUTER_NOTIFY:
if (is_int && value >= 0
&& value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) {
options.router_notify = value;
return 0;
}
break;
#endif
default: default:
return routing_socket_base_t::xsetsockopt (option_, optval_, return routing_socket_base_t::xsetsockopt (option_, optval_,
optvallen_); optvallen_);
......
...@@ -217,6 +217,12 @@ void zmq::session_base_t::flush () ...@@ -217,6 +217,12 @@ void zmq::session_base_t::flush ()
_pipe->flush (); _pipe->flush ();
} }
void zmq::session_base_t::rollback ()
{
if (_pipe)
_pipe->rollback ();
}
void zmq::session_base_t::clean_pipes () void zmq::session_base_t::clean_pipes ()
{ {
zmq_assert (_pipe != NULL); zmq_assert (_pipe != NULL);
......
...@@ -60,6 +60,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events ...@@ -60,6 +60,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual void reset (); virtual void reset ();
void flush (); void flush ();
void rollback ();
void engine_error (zmq::stream_engine_t::error_reason_t reason_); void engine_error (zmq::stream_engine_t::error_reason_t reason_);
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
......
...@@ -897,6 +897,8 @@ void zmq::stream_engine_t::mechanism_ready () ...@@ -897,6 +897,8 @@ void zmq::stream_engine_t::mechanism_ready ()
_has_heartbeat_timer = true; _has_heartbeat_timer = true;
} }
bool flush_session = false;
if (_options.recv_routing_id) { if (_options.recv_routing_id) {
msg_t routing_id; msg_t routing_id;
_mechanism->peer_routing_id (&routing_id); _mechanism->peer_routing_id (&routing_id);
...@@ -908,8 +910,25 @@ void zmq::stream_engine_t::mechanism_ready () ...@@ -908,8 +910,25 @@ void zmq::stream_engine_t::mechanism_ready ()
return; return;
} }
errno_assert (rc == 0); errno_assert (rc == 0);
_session->flush (); flush_session = true;
}
if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
msg_t connect_notification;
connect_notification.init ();
const int rc = _session->push_msg (&connect_notification);
if (rc == -1 && errno == EAGAIN) {
// If the write is failing at this stage with
// an EAGAIN the pipe must be being shut down,
// so we can just bail out of the notification.
return;
} }
errno_assert (rc == 0);
flush_session = true;
}
if (flush_session)
_session->flush ();
_next_msg = &stream_engine_t::pull_and_encode; _next_msg = &stream_engine_t::pull_and_encode;
_process_msg = &stream_engine_t::write_credential; _process_msg = &stream_engine_t::write_credential;
...@@ -1038,6 +1057,18 @@ void zmq::stream_engine_t::error (error_reason_t reason_) ...@@ -1038,6 +1057,18 @@ void zmq::stream_engine_t::error (error_reason_t reason_)
terminator.close (); terminator.close ();
} }
zmq_assert (_session); zmq_assert (_session);
if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
// For router sockets with disconnect notification, rollback
// any incomplete message in the pipe, and push the disconnect
// notification message.
_session->rollback ();
msg_t disconnect_notification;
disconnect_notification.init ();
_session->push_msg (&disconnect_notification);
}
#ifdef ZMQ_BUILD_DRAFT_API #ifdef ZMQ_BUILD_DRAFT_API
// protocol errors have been signaled already at the point where they occurred // protocol errors have been signaled already at the point where they occurred
if (reason_ != protocol_error if (reason_ != protocol_error
......
...@@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); ...@@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96 #define ZMQ_MULTICAST_LOOP 96
#define ZMQ_ROUTER_NOTIFY 97
/* DRAFT 0MQ socket events and monitoring */ /* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */ /* Unspecified system errors during handshake. Event value is an errno. */
...@@ -115,6 +116,10 @@ const char *zmq_msg_group (zmq_msg_t *msg_); ...@@ -115,6 +116,10 @@ const char *zmq_msg_group (zmq_msg_t *msg_);
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id" #define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address" #define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
/* Router notify options */
#define ZMQ_NOTIFY_CONNECT 1
#define ZMQ_NOTIFY_DISCONNECT 2
/******************************************************************************/ /******************************************************************************/
/* Poller polling on sockets,fd and thread-safe sockets */ /* Poller polling on sockets,fd and thread-safe sockets */
/******************************************************************************/ /******************************************************************************/
......
...@@ -136,6 +136,7 @@ IF (ENABLE_DRAFTS) ...@@ -136,6 +136,7 @@ IF (ENABLE_DRAFTS)
test_scatter_gather test_scatter_gather
test_dgram test_dgram
test_app_meta test_app_meta
test_router_notify
) )
ENDIF (ENABLE_DRAFTS) ENDIF (ENABLE_DRAFTS)
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment