Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
588e0d2d
Commit
588e0d2d
authored
Jul 23, 2015
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1486 from jimenezrick/fix-1478
Fix 1478: receive unsubscriptions in XPUB when verbose
parents
305c0758
ec5592db
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
61 additions
and
23 deletions
+61
-23
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+22
-2
zmq.h
include/zmq.h
+1
-0
mtrie.cpp
src/mtrie.cpp
+14
-9
mtrie.hpp
src/mtrie.hpp
+5
-4
xpub.cpp
src/xpub.cpp
+14
-7
xpub.hpp
src/xpub.hpp
+5
-1
No files found.
doc/zmq_setsockopt.txt
View file @
588e0d2d
...
...
@@ -14,8 +14,9 @@ SYNOPSIS
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER,
ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM
and ZMQ_RCVHWM, only take effect for subsequent socket bind/connects.
ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, ZMQ_REQ_CORRELATE,
ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, only take effect for
subsequent socket bind/connects.
Specifically, security options take effect for subsequent bind/connect calls,
and can be changed at any time to affect subsequent binds and/or connects.
...
...
@@ -839,6 +840,25 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE: provide all unsubscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions.
A value of '0' is the default and passes only the last unsubscription message to
upstream. A value of '1' passes all unsubscription messages upstream.
This behaviour should be enabled in all the intermediary XPUB sockets if
ZMQ_XPUB_VERBOSE is also being used in order to allow the correct forwarding
of all the unsubscription messages.
NOTE: This behaviour only takes effect when ZMQ_XPUB_VERBOSE is also enabled.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_MANUAL: change the subscription handling to manual
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket subscription handling mode manual/automatic.
...
...
include/zmq.h
View file @
588e0d2d
...
...
@@ -319,6 +319,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
#define ZMQ_HEARTBEAT_IVL 75
#define ZMQ_HEARTBEAT_TTL 76
#define ZMQ_HEARTBEAT_TIMEOUT 77
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78
/* Message options */
#define ZMQ_MORE 1
...
...
src/mtrie.cpp
View file @
588e0d2d
...
...
@@ -159,23 +159,28 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
void
zmq
::
mtrie_t
::
rm
(
pipe_t
*
pipe_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
)
void
*
arg_
,
bool
call_on_uniq_
)
{
unsigned
char
*
buff
=
NULL
;
rm_helper
(
pipe_
,
&
buff
,
0
,
0
,
func_
,
arg_
);
rm_helper
(
pipe_
,
&
buff
,
0
,
0
,
func_
,
arg_
,
call_on_uniq_
);
free
(
buff
);
}
void
zmq
::
mtrie_t
::
rm_helper
(
pipe_t
*
pipe_
,
unsigned
char
**
buff_
,
size_t
buffsize_
,
size_t
maxbuffsize_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
)
void
*
arg_
,
bool
call_on_uniq_
)
{
// Remove the subscription from this node.
if
(
pipes
&&
pipes
->
erase
(
pipe_
)
&&
pipes
->
empty
())
{
func_
(
*
buff_
,
buffsize_
,
arg_
);
delete
pipes
;
pipes
=
0
;
if
(
pipes
&&
pipes
->
erase
(
pipe_
))
{
if
(
!
call_on_uniq_
||
pipes
->
empty
())
{
func_
(
*
buff_
,
buffsize_
,
arg_
);
}
if
(
pipes
->
empty
())
{
delete
pipes
;
pipes
=
0
;
}
}
// Adjust the buffer.
...
...
@@ -194,7 +199,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
(
*
buff_
)
[
buffsize_
]
=
min
;
buffsize_
++
;
next
.
node
->
rm_helper
(
pipe_
,
buff_
,
buffsize_
,
maxbuffsize_
,
func_
,
arg_
);
func_
,
arg_
,
call_on_uniq_
);
// Prune the node if it was made redundant by the removal
if
(
next
.
node
->
is_redundant
())
{
...
...
@@ -217,7 +222,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
(
*
buff_
)
[
buffsize_
]
=
min
+
c
;
if
(
next
.
table
[
c
])
{
next
.
table
[
c
]
->
rm_helper
(
pipe_
,
buff_
,
buffsize_
+
1
,
maxbuffsize_
,
func_
,
arg_
);
maxbuffsize_
,
func_
,
arg_
,
call_on_uniq_
);
// Prune redundant nodes from the mtrie
if
(
next
.
table
[
c
]
->
is_redundant
())
{
...
...
src/mtrie.hpp
View file @
588e0d2d
...
...
@@ -54,11 +54,12 @@ namespace zmq
bool
add
(
unsigned
char
*
prefix_
,
size_t
size_
,
zmq
::
pipe_t
*
pipe_
);
// Remove all subscriptions for a specific peer from the trie.
// If there are no subscriptions left on some topics, invoke the
// supplied callback function.
// The call_on_uniq_ flag controls if the callback is invoked
// when there are no subscriptions left on some topics or on
// every removal.
void
rm
(
zmq
::
pipe_t
*
pipe_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
);
void
*
arg_
,
bool
call_on_uniq_
);
// Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated.
...
...
@@ -75,7 +76,7 @@ namespace zmq
void
rm_helper
(
zmq
::
pipe_t
*
pipe_
,
unsigned
char
**
buff_
,
size_t
buffsize_
,
size_t
maxbuffsize_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
);
void
*
arg_
,
bool
call_on_uniq_
);
bool
rm_helper
(
unsigned
char
*
prefix_
,
size_t
size_
,
zmq
::
pipe_t
*
pipe_
);
bool
is_redundant
()
const
;
...
...
src/xpub.cpp
View file @
588e0d2d
...
...
@@ -36,7 +36,8 @@
zmq
::
xpub_t
::
xpub_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
socket_base_t
(
parent_
,
tid_
,
sid_
),
verbose
(
false
),
verbose_subs
(
false
),
verbose_unsubs
(
false
),
more
(
false
),
lossy
(
true
),
manual
(
false
),
...
...
@@ -101,9 +102,11 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
else
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.)
if
(
options
.
type
==
ZMQ_XPUB
&&
(
unique
||
(
*
data
&&
verbose
)))
{
// If the (un)subscription is not a duplicate store it so that it can be
// passed to the user on next recv call unless verbose mode is enabled
// which makes to pass always these messages.
if
(
options
.
type
==
ZMQ_XPUB
&&
(
unique
||
(
*
data
==
1
&&
verbose_subs
)
||
(
*
data
==
0
&&
verbose_unsubs
&&
verbose_subs
)))
{
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_flags
.
push_back
(
0
);
}
...
...
@@ -126,7 +129,8 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int
zmq
::
xpub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
{
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
errno
=
EINVAL
;
...
...
@@ -134,7 +138,10 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
}
if
(
option_
==
ZMQ_XPUB_VERBOSE
)
verbose
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
verbose_subs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
else
if
(
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
)
verbose_unsubs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
else
if
(
option_
==
ZMQ_XPUB_NODROP
)
lossy
=
(
*
static_cast
<
const
int
*>
(
optval_
)
==
0
);
...
...
@@ -173,7 +180,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
// Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions
// upstream.
subscriptions
.
rm
(
pipe_
,
send_unsubscription
,
this
);
subscriptions
.
rm
(
pipe_
,
send_unsubscription
,
this
,
!
verbose_unsubs
);
dist
.
pipe_terminated
(
pipe_
);
}
...
...
src/xpub.hpp
View file @
588e0d2d
...
...
@@ -84,7 +84,11 @@ namespace zmq
// If true, send all subscription messages upstream, not just
// unique ones
bool
verbose
;
bool
verbose_subs
;
// If true, send all unsubscription messages upstream, not just
// unique ones
bool
verbose_unsubs
;
// True if we are in the middle of sending a multi-part message.
bool
more
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment