xpub.cpp 10.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <string.h>

33 34
#include "xpub.hpp"
#include "pipe.hpp"
35 36
#include "err.hpp"
#include "msg.hpp"
37
#include "macros.hpp"
38
#include "generic_mtrie_impl.hpp"
39

40 41
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
42 43
    verbose_subs (false),
    verbose_unsubs (false),
Martin Hurton's avatar
Martin Hurton committed
44
    more (false),
45
    lossy (true),
46
    manual (false),
47
    pending_pipes (),
48
    welcome_msg ()
49
{
50 51
    last_pipe = NULL;
    options.type = ZMQ_XPUB;
52
    welcome_msg.init ();
53 54 55 56
}

zmq::xpub_t::~xpub_t ()
{
57
    welcome_msg.close ();
58 59
}

60
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
61
{
62 63
    zmq_assert (pipe_);
    dist.attach (pipe_);
64

65
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
66
    //  to all data on this pipe, implicitly.
67
    if (subscribe_to_all_)
68
        subscriptions.add (NULL, 0, pipe_);
somdoron's avatar
somdoron committed
69

70
    // if welcome message exists, send a copy of it
71
    if (welcome_msg.size () > 0) {
72
        msg_t copy;
73 74 75 76 77 78
        copy.init ();
        int rc = copy.copy (welcome_msg);
        errno_assert (rc == 0);
        bool ok = pipe_->write (&copy);
        zmq_assert (ok);
        pipe_->flush ();
79
    }
80

81 82 83
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
84 85 86 87
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
88 89
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
90
    while (pipe_->read (&sub)) {
91 92
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
93
        const size_t size = sub.size ();
94
        metadata_t *metadata = sub.metadata ();
95
        if (size > 0 && (*data == 0 || *data == 1)) {
96
            if (manual) {
97 98
                // Store manual subscription to use on termination
                if (*data == 0)
99
                    manual_subscriptions.rm (data + 1, size - 1, pipe_);
100
                else
101
                    manual_subscriptions.add (data + 1, size - 1, pipe_);
102

103 104
                pending_pipes.push_back (pipe_);
                pending_data.push_back (blob_t (data, size));
105
                if (metadata)
106 107 108
                    metadata->add_ref ();
                pending_metadata.push_back (metadata);
                pending_flags.push_back (0);
109
            } else {
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
                bool notify;
                if (*data == 0) {
                    mtrie_t::rm_result rm_result =
                      subscriptions.rm (data + 1, size - 1, pipe_);
                    //  TODO reconsider what to do if rm_result == mtrie_t::not_found
                    notify =
                      rm_result != mtrie_t::values_remain || verbose_unsubs;
                } else {
                    bool first_added =
                      subscriptions.add (data + 1, size - 1, pipe_);
                    notify = first_added || verbose_subs;
                }

                //  If the request was a new subscription, or the subscription
                //  was removed, or verbose mode is enabled, store it so that
                //  it can be passed to the user on next recv call.
                if (options.type == ZMQ_XPUB && notify) {
127
                    pending_data.push_back (blob_t (data, size));
128
                    if (metadata)
129 130 131
                        metadata->add_ref ();
                    pending_metadata.push_back (metadata);
                    pending_flags.push_back (0);
132 133
                }
            }
134
        } else {
135
            //  Process user message coming upstream from xsub socket
136
            pending_data.push_back (blob_t (data, size));
137
            if (metadata)
138
                metadata->add_ref ();
139
            pending_metadata.push_back (metadata);
140 141
            pending_flags.push_back (sub.flags ());
        }
142
        sub.close ();
143
    }
144 145
}

146
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
147 148 149 150
{
    dist.activated (pipe_);
}

151 152 153
int zmq::xpub_t::xsetsockopt (int option_,
                              const void *optval_,
                              size_t optvallen_)
154
{
155 156 157 158
    if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
        || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) {
        if (optvallen_ != sizeof (int)
            || *static_cast<const int *> (optval_) < 0) {
159 160 161
            errno = EINVAL;
            return -1;
        }
162
        if (option_ == ZMQ_XPUB_VERBOSE) {
163
            verbose_subs = (*static_cast<const int *> (optval_) != 0);
164
            verbose_unsubs = 0;
165 166
        } else if (option_ == ZMQ_XPUB_VERBOSER) {
            verbose_subs = (*static_cast<const int *> (optval_) != 0);
167
            verbose_unsubs = verbose_subs;
168 169 170 171 172
        } else if (option_ == ZMQ_XPUB_NODROP)
            lossy = (*static_cast<const int *> (optval_) == 0);
        else if (option_ == ZMQ_XPUB_MANUAL)
            manual = (*static_cast<const int *> (optval_) != 0);
    } else if (option_ == ZMQ_SUBSCRIBE && manual) {
173
        if (last_pipe != NULL)
174 175 176
            subscriptions.add ((unsigned char *) optval_, optvallen_,
                               last_pipe);
    } else if (option_ == ZMQ_UNSUBSCRIBE && manual) {
177
        if (last_pipe != NULL)
178
            subscriptions.rm ((unsigned char *) optval_, optvallen_, last_pipe);
179
    } else if (option_ == ZMQ_XPUB_WELCOME_MSG) {
180
        welcome_msg.close ();
181 182

        if (optvallen_ > 0) {
183
            int rc = welcome_msg.init_size (optvallen_);
184
            errno_assert (rc == 0);
185

186
            unsigned char *data = (unsigned char *) welcome_msg.data ();
187
            memcpy (data, optval_, optvallen_);
188
        } else
189
            welcome_msg.init ();
190
    } else {
191 192 193
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
194 195 196
    return 0;
}

197
static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
198
{
199 200 201
    LIBZMQ_UNUSED (data_);
    LIBZMQ_UNUSED (size_);
    LIBZMQ_UNUSED (arg_);
202 203
}

204
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
205
{
206
    if (manual) {
207 208 209
        //  Remove the pipe from the trie and send corresponding manual
        //  unsubscriptions upstream.
        manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
210 211 212
        //  Remove pipe without actually sending the message as it was taken
        //  care of by the manual call above. subscriptions is the real mtrie,
        //  so the pipe must be removed from there or it will be left over.
213
        subscriptions.rm (pipe_, stub, (void *) NULL, false);
214
    } else {
215 216 217 218 219
        //  Remove the pipe from the trie. If there are topics that nobody
        //  is interested in anymore, send corresponding unsubscriptions
        //  upstream.
        subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
    }
220

221
    dist.pipe_terminated (pipe_);
222 223
}

224
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
225
{
226
    self_->dist.match (pipe_);
227 228
}

229
int zmq::xpub_t::xsend (msg_t *msg_)
230
{
231
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
232 233

    //  For the first part of multi-part message, find the matching pipes.
234
    if (!more) {
235 236
        subscriptions.match ((unsigned char *) msg_->data (), msg_->size (),
                             mark_as_matching, this);
237 238
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
239
            dist.reverse_match ();
240 241
        }
    }
242

243
    int rc = -1; //  Assume we fail
244 245
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
246
            //  If we are at the end of multi-part message we can mark
247 248 249 250
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
251
            rc = 0; //  Yay, sent successfully
252
        }
253
    } else
254 255
        errno = EAGAIN;
    return rc;
256 257 258 259
}

bool zmq::xpub_t::xhas_out ()
{
260
    return dist.has_out ();
261 262
}

263
int zmq::xpub_t::xrecv (msg_t *msg_)
264
{
Martin Hurton's avatar
Martin Hurton committed
265
    //  If there is at least one
266
    if (pending_data.empty ()) {
267 268 269
        errno = EAGAIN;
        return -1;
    }
270

271
    // User is reading a message, set last_pipe and remove it from the deque
272 273 274 275
    if (manual && !pending_pipes.empty ()) {
        last_pipe = pending_pipes.front ();
        pending_pipes.pop_front ();
    }
276

277 278
    int rc = msg_->close ();
    errno_assert (rc == 0);
279
    rc = msg_->init_size (pending_data.front ().size ());
280
    errno_assert (rc == 0);
281 282
    memcpy (msg_->data (), pending_data.front ().data (),
            pending_data.front ().size ());
283 284

    // set metadata only if there is some
285
    if (metadata_t *metadata = pending_metadata.front ()) {
286
        msg_->set_metadata (metadata);
287
        // Remove ref corresponding to vector placement
288
        metadata->drop_ref ();
289
    }
290

291 292
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
293
    pending_metadata.pop_front ();
294
    pending_flags.pop_front ();
295
    return 0;
296 297 298 299
}

bool zmq::xpub_t::xhas_in ()
{
300
    return !pending_data.empty ();
301 302
}

303
void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_,
304
                                       size_t size_,
305
                                       xpub_t *self_)
306
{
307
    if (self_->options.type != ZMQ_PUB) {
308 309
        //  Place the unsubscription to the queue of pending (un)subscriptions
        //  to be retrieved by the user later on.
310
        blob_t unsub (size_ + 1);
311
        *unsub.data () = 0;
Martin Hurton's avatar
Martin Hurton committed
312
        if (size_ > 0)
313
            memcpy (unsub.data () + 1, data_, size_);
314 315 316
        self_->pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
        self_->pending_metadata.push_back (NULL);
        self_->pending_flags.push_back (0);
317

318 319 320
        if (self_->manual) {
            self_->last_pipe = NULL;
            self_->pending_pipes.push_back (NULL);
321
        }
322
    }
323
}