Commit 2f98f703 authored by imkcy9's avatar imkcy9 Committed by Luca Boccassi

Support XPub socket send last value caching to last subscription pipe with…

 Support XPub socket send last value caching to last subscription pipe with ZMQ_XPUB_MANUAL_LAST_VALUE. (#3511)

* Add ZMQ_XPUB_MANUAL_LAST_VALUE

* Surpport xpub send last value caching to one pipe with ZMQ_XPUB_MANUAL_LAST_VALUE

* Add test_xpubub_manual_last_value

* Add relicense and doc
parent 6b51f033
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL
This is a statement by Chengye Ke
that grants permission to relicense its copyrights in the libzmq C++
library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other
Open Source Initiative approved license chosen by the current ZeroMQ
BDFL (Benevolent Dictator for Life).
A portion of the commits made by the Github handle "imkcy9", with
commit author "Chengye Ke <imkcy9@icloud.com>" or
"Chengye Ke <imkcy9@gmail.com>", are copyright of Chengye Ke.
This document hereby grants the libzmq project team to relicense libzmq,
including all past, present and future contributions of the author listed above.
Chengye Ke
2019/05/18
\ No newline at end of file
...@@ -1073,6 +1073,23 @@ Default value:: 0 ...@@ -1073,6 +1073,23 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_MANUAL_LAST_VALUE: change the subscription handling to manual
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This option is similar to ZMQ_XPUB_MANUAL.
What is the difference, ZMQ_XPUB_MANUAL_LAST_VALUE sets the 'XPUB' socket
behaviour to send the first message to the last subscriber after the 'XPUB' socket
recieve a subscription and call setsockopt with ZMQ_SUBSCRIBE on 'XPUB' socket.
This prevent duplicated message when use last value caching(LVC).
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_NODROP: do not silently drop messages if SENDHWM is reached ZMQ_XPUB_NODROP: do not silently drop messages if SENDHWM is reached
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour to return error EAGAIN if SENDHWM is Sets the 'XPUB' socket behaviour to return error EAGAIN if SENDHWM is
......
...@@ -655,6 +655,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); ...@@ -655,6 +655,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96 #define ZMQ_MULTICAST_LOOP 96
#define ZMQ_ROUTER_NOTIFY 97 #define ZMQ_ROUTER_NOTIFY 97
#define ZMQ_XPUB_MANUAL_LAST_VALUE 98
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10
......
...@@ -44,6 +44,9 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -44,6 +44,9 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
_more (false), _more (false),
_lossy (true), _lossy (true),
_manual (false), _manual (false),
#ifdef ZMQ_BUILD_DRAFT_API
_send_last_pipe (false),
#endif
_pending_pipes (), _pending_pipes (),
_welcome_msg () _welcome_msg ()
{ {
...@@ -190,6 +193,9 @@ int zmq::xpub_t::xsetsockopt (int option_, ...@@ -190,6 +193,9 @@ int zmq::xpub_t::xsetsockopt (int option_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
#ifdef ZMQ_BUILD_DRAFT_API
|| option_ == ZMQ_XPUB_MANUAL_LAST_VALUE
#endif
|| option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) { || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) {
if (optvallen_ != sizeof (int) if (optvallen_ != sizeof (int)
|| *static_cast<const int *> (optval_) < 0) { || *static_cast<const int *> (optval_) < 0) {
...@@ -202,6 +208,11 @@ int zmq::xpub_t::xsetsockopt (int option_, ...@@ -202,6 +208,11 @@ int zmq::xpub_t::xsetsockopt (int option_,
} else if (option_ == ZMQ_XPUB_VERBOSER) { } else if (option_ == ZMQ_XPUB_VERBOSER) {
_verbose_subs = (*static_cast<const int *> (optval_) != 0); _verbose_subs = (*static_cast<const int *> (optval_) != 0);
_verbose_unsubs = _verbose_subs; _verbose_unsubs = _verbose_subs;
#ifdef ZMQ_BUILD_DRAFT_API
} else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
_manual = (*static_cast<const int *> (optval_) != 0);
_send_last_pipe = _manual;
#endif
} else if (option_ == ZMQ_XPUB_NODROP) } else if (option_ == ZMQ_XPUB_NODROP)
_lossy = (*static_cast<const int *> (optval_) == 0); _lossy = (*static_cast<const int *> (optval_) == 0);
else if (option_ == ZMQ_XPUB_MANUAL) else if (option_ == ZMQ_XPUB_MANUAL)
...@@ -265,14 +276,33 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_) ...@@ -265,14 +276,33 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
self_->_dist.match (pipe_); self_->_dist.match (pipe_);
} }
#ifdef ZMQ_BUILD_DRAFT_API
void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
{
if (self_->_last_pipe == pipe_)
self_->_dist.match (pipe_);
}
#endif
int zmq::xpub_t::xsend (msg_t *msg_) int zmq::xpub_t::xsend (msg_t *msg_)
{ {
bool msg_more = (msg_->flags () & msg_t::more) != 0; bool msg_more = (msg_->flags () & msg_t::more) != 0;
// For the first part of multi-part message, find the matching pipes. // For the first part of multi-part message, find the matching pipes.
if (!_more) { if (!_more) {
#ifdef ZMQ_BUILD_DRAFT_API
if (_manual && _last_pipe && _send_last_pipe) {
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_last_pipe_as_matching,
this);
_last_pipe = NULL;
} else
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_as_matching, this);
#else
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()), _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_as_matching, this); msg_->size (), mark_as_matching, this);
#endif
// If inverted matching is used, reverse the selection now // If inverted matching is used, reverse the selection now
if (options.invert_matching) { if (options.invert_matching) {
_dist.reverse_match (); _dist.reverse_match ();
......
...@@ -99,6 +99,14 @@ class xpub_t : public socket_base_t ...@@ -99,6 +99,14 @@ class xpub_t : public socket_base_t
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
bool _manual; bool _manual;
#ifdef ZMQ_BUILD_DRAFT_API
// Send message to the last pipe, only used if xpub is on manual and after calling set option with ZMQ_SUBSCRIBE
bool _send_last_pipe;
// Function to be applied to match the last pipe.
static void mark_last_pipe_as_matching (zmq::pipe_t *pipe_, xpub_t *arg_);
#endif
// Last pipe that sent subscription message, only used if xpub is on manual // Last pipe that sent subscription message, only used if xpub is on manual
pipe_t *_last_pipe; pipe_t *_last_pipe;
......
...@@ -52,6 +52,7 @@ ...@@ -52,6 +52,7 @@
#define ZMQ_METADATA 95 #define ZMQ_METADATA 95
#define ZMQ_MULTICAST_LOOP 96 #define ZMQ_MULTICAST_LOOP 96
#define ZMQ_ROUTER_NOTIFY 97 #define ZMQ_ROUTER_NOTIFY 97
#define ZMQ_XPUB_MANUAL_LAST_VALUE 98
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10
......
...@@ -153,6 +153,7 @@ if(ENABLE_DRAFTS) ...@@ -153,6 +153,7 @@ if(ENABLE_DRAFTS)
test_dgram test_dgram
test_app_meta test_app_meta
test_router_notify test_router_notify
test_xpub_manual_last_value
) )
endif() endif()
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment