Commit baea4066 authored by Fedor Sheremetyev's avatar Fedor Sheremetyev

Store manual subscriptions in XPUB and send them out on pipe termination.

parent 813c7381
...@@ -92,6 +92,12 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -92,6 +92,12 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (size > 0 && (*data == 0 || *data == 1)) { if (size > 0 && (*data == 0 || *data == 1)) {
if (manual) if (manual)
{ {
// Store manual subscription to use on termination
if (*data == 0)
manual_subscriptions.rm(data + 1, size - 1, pipe_);
else
manual_subscriptions.add(data + 1, size - 1, pipe_);
pending_pipes.push_back(pipe_); pending_pipes.push_back(pipe_);
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); pending_metadata.push_back(sub.metadata());
...@@ -191,10 +197,19 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, ...@@ -191,10 +197,19 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
{ {
// Remove the pipe from the trie. If there are topics that nobody if (manual)
// is interested in anymore, send corresponding unsubscriptions {
// upstream. // Remove the pipe from the trie and send corresponding manual
subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual)); // unsubscriptions upstream.
manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
}
else
{
// Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions
// upstream.
subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
}
dist.pipe_terminated (pipe_); dist.pipe_terminated (pipe_);
} }
......
...@@ -79,6 +79,9 @@ namespace zmq ...@@ -79,6 +79,9 @@ namespace zmq
// List of all subscriptions mapped to corresponding pipes. // List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions; mtrie_t subscriptions;
// List of manual subscriptions mapped to corresponding pipes.
mtrie_t manual_subscriptions;
// Distributor of messages holding the list of outbound pipes. // Distributor of messages holding the list of outbound pipes.
dist_t dist; dist_t dist;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment