Commit f87bf382 authored by Pieter Hintjens's avatar Pieter Hintjens

Fixed issue #443

parent d6e0ae24
...@@ -385,6 +385,20 @@ Default value:: 0 ...@@ -385,6 +385,20 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER Applicable socket types:: ZMQ_ROUTER
ZMQ_XPUB_VERBOSE: Set the XPUB socket behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behavior on new subscriptions. A value of '0' is the default
and passes only new subscription messages to upstream. A value of '1' passes all
subscription messages upstream.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_XPUB
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Override 'SO_KEEPALIVE' socket option(where supported by OS). Override 'SO_KEEPALIVE' socket option(where supported by OS).
......
...@@ -249,6 +249,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -249,6 +249,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39 #define ZMQ_DELAY_ATTACH_ON_CONNECT 39
#define ZMQ_XPUB_VERBOSE 40
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
verbose(false),
more (false) more (false)
{ {
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
...@@ -70,7 +71,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -70,7 +71,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// If the subscription is not a duplicate store it so that it can be // If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. // passed to used on next recv call.
if (unique && options.type != ZMQ_PUB) if (options.type == ZMQ_XPUB && (unique || verbose))
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
} }
...@@ -83,6 +84,21 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) ...@@ -83,6 +84,21 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
dist.activated (pipe_); dist.activated (pipe_);
} }
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_XPUB_VERBOSE) {
errno = EINVAL;
return -1;
}
if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL;
return -1;
}
verbose = *static_cast <const int*> (optval_);
return 0;
}
void zmq::xpub_t::xterminated (pipe_t *pipe_) void zmq::xpub_t::xterminated (pipe_t *pipe_)
{ {
// Remove the pipe from the trie. If there are topics that nobody // Remove the pipe from the trie. If there are topics that nobody
......
...@@ -54,6 +54,7 @@ namespace zmq ...@@ -54,6 +54,7 @@ namespace zmq
bool xhas_in (); bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterminated (zmq::pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
...@@ -72,6 +73,10 @@ namespace zmq ...@@ -72,6 +73,10 @@ namespace zmq
// Distributor of messages holding the list of outbound pipes. // Distributor of messages holding the list of outbound pipes.
dist_t dist; dist_t dist;
// If true, send all subscription messages upstream, not just
// unique ones
bool verbose;
// True if we are in the middle of sending a multi-part message. // True if we are in the middle of sending a multi-part message.
bool more; bool more;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment