xpub.cpp 10.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <string.h>

33 34
#include "xpub.hpp"
#include "pipe.hpp"
35 36
#include "err.hpp"
#include "msg.hpp"
37
#include "macros.hpp"
38

39 40
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
41 42
    verbose_subs (false),
    verbose_unsubs (false),
Martin Hurton's avatar
Martin Hurton committed
43
    more (false),
44
    lossy (true),
45
    manual (false),
46
    pending_pipes (),
47
    welcome_msg ()
48
{
49 50
    last_pipe = NULL;
    options.type = ZMQ_XPUB;
51
    welcome_msg.init ();
52 53 54 55
}

zmq::xpub_t::~xpub_t ()
{
56
    welcome_msg.close ();
57 58
}

59
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
60
{
61 62
    zmq_assert (pipe_);
    dist.attach (pipe_);
63

64
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
65
    //  to all data on this pipe, implicitly.
66
    if (subscribe_to_all_)
67
        subscriptions.add (NULL, 0, pipe_);
somdoron's avatar
somdoron committed
68

69 70
    // if welcome message exists, send a copy of it
    if (welcome_msg.size () > 0)
71 72
    {
        msg_t copy;
73 74 75 76 77 78
        copy.init ();
        int rc = copy.copy (welcome_msg);
        errno_assert (rc == 0);
        bool ok = pipe_->write (&copy);
        zmq_assert (ok);
        pipe_->flush ();
79
    }
80

81 82 83
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
84 85 86 87
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
88 89
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
90
    while (pipe_->read (&sub)) {
91 92
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
93
        const size_t size = sub.size ();
94
        metadata_t* metadata = sub.metadata();
95 96 97
        if (size > 0 && (*data == 0 || *data == 1)) {
            if (manual)
            {
98 99
                // Store manual subscription to use on termination
                if (*data == 0)
100
                    manual_subscriptions.rm (data + 1, size - 1, pipe_);
101
                else
102
                    manual_subscriptions.add (data + 1, size - 1, pipe_);
103

104 105
                pending_pipes.push_back (pipe_);
                pending_data.push_back (blob_t (data, size));
106
                if (metadata)
107 108 109
                    metadata->add_ref ();
                pending_metadata.push_back (metadata);
                pending_flags.push_back (0);
110 111 112 113 114
            }
            else
            {
                bool unique;
                if (*data == 0)
115
                    unique = subscriptions.rm (data + 1, size - 1, pipe_);
116
                else
117
                    unique = subscriptions.add (data + 1, size - 1, pipe_);
118 119 120 121 122 123

                //  If the (un)subscription is not a duplicate store it so that it can be
                //  passed to the user on next recv call unless verbose mode is enabled
                //  which makes to pass always these messages.
                if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
                        (*data == 0 && verbose_unsubs && verbose_subs))) {
124
                    pending_data.push_back (blob_t(data, size));
125
                    if (metadata)
126 127 128
                        metadata->add_ref ();
                    pending_metadata.push_back (metadata);
                    pending_flags.push_back (0);
129 130
                }
            }
131
        }
132
        else {
133
            //  Process user message coming upstream from xsub socket
134
            pending_data.push_back (blob_t (data, size));
135
            if (metadata)
136
                metadata->add_ref ();
137
            pending_metadata.push_back (metadata);
138 139
            pending_flags.push_back (sub.flags ());
        }
140
        sub.close ();
141
    }
142 143
}

144
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
145 146 147 148
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
149 150
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
151
{
152 153 154 155
    if (option_ == ZMQ_XPUB_VERBOSE
     || option_ == ZMQ_XPUB_VERBOSER
     || option_ == ZMQ_XPUB_NODROP
     || option_ == ZMQ_XPUB_MANUAL) {
156
        if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
157 158 159
            errno = EINVAL;
            return -1;
        }
160
        if (option_ == ZMQ_XPUB_VERBOSE) {
161
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
162 163
            verbose_unsubs = 0;
        }
164
        else
165 166 167 168
        if (option_ == ZMQ_XPUB_VERBOSER) {
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
            verbose_unsubs = verbose_subs;
        }
169 170 171 172 173 174 175 176
        else
        if (option_ == ZMQ_XPUB_NODROP)
            lossy = (*static_cast <const int*> (optval_) == 0);
        else
        if (option_ == ZMQ_XPUB_MANUAL)
            manual = (*static_cast <const int*> (optval_) != 0);
    }
    else
177
    if (option_ == ZMQ_SUBSCRIBE && manual) {
178
        if (last_pipe != NULL)
179
            subscriptions.add ((unsigned char *) optval_, optvallen_, last_pipe);
180
    }
181
    else
182
    if (option_ == ZMQ_UNSUBSCRIBE && manual) {
183
        if (last_pipe != NULL)
184
            subscriptions.rm ((unsigned char *) optval_, optvallen_, last_pipe);
185
    }
186 187
    else
    if (option_ == ZMQ_XPUB_WELCOME_MSG) {
188
        welcome_msg.close ();
189 190

        if (optvallen_ > 0) {
191
            int rc = welcome_msg.init_size (optvallen_);
192
            errno_assert(rc == 0);
193

194 195
            unsigned char *data = (unsigned char*) welcome_msg.data ();
            memcpy (data, optval_, optvallen_);
196 197
        }
        else
198
            welcome_msg.init ();
199
    }
200 201 202 203
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
204 205 206
    return 0;
}

207 208 209 210 211 212 213
static void stub (unsigned char *data_, size_t size_, void *arg_)
{
    LIBZMQ_UNUSED(data_);
    LIBZMQ_UNUSED(size_);
    LIBZMQ_UNUSED(arg_);
}

214
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
215
{
216 217 218 219 220
    if (manual)
    {
        //  Remove the pipe from the trie and send corresponding manual
        //  unsubscriptions upstream.
        manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
221 222 223 224
        //  Remove pipe without actually sending the message as it was taken
        //  care of by the manual call above. subscriptions is the real mtrie,
        //  so the pipe must be removed from there or it will be left over.
        subscriptions.rm (pipe_, stub, NULL, false);
225 226 227 228 229 230 231 232
    }
    else
    {
        //  Remove the pipe from the trie. If there are topics that nobody
        //  is interested in anymore, send corresponding unsubscriptions
        //  upstream.
        subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
    }
233

234
    dist.pipe_terminated (pipe_);
235 236
}

237 238 239 240 241 242
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

243
int zmq::xpub_t::xsend (msg_t *msg_)
244
{
245
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
246 247

    //  For the first part of multi-part message, find the matching pipes.
248
    if (!more) {
249 250
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
251 252 253 254 255
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
            dist.reverse_match();
        }
    }
256

257 258 259
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
260
            //  If we are at the end of multi-part message we can mark
261 262 263 264 265 266
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
267
    }
268 269 270
    else
        errno = EAGAIN;
    return rc;
271 272 273 274
}

bool zmq::xpub_t::xhas_out ()
{
275
    return dist.has_out ();
276 277
}

278
int zmq::xpub_t::xrecv (msg_t *msg_)
279
{
Martin Hurton's avatar
Martin Hurton committed
280
    //  If there is at least one
281
    if (pending_data.empty ()) {
282 283 284
        errno = EAGAIN;
        return -1;
    }
285

286
    // User is reading a message, set last_pipe and remove it from the deque
287 288 289 290
    if (manual && !pending_pipes.empty ()) {
        last_pipe = pending_pipes.front ();
        pending_pipes.pop_front ();
    }
291

292 293
    int rc = msg_->close ();
    errno_assert (rc == 0);
294
    rc = msg_->init_size (pending_data.front ().size ());
295
    errno_assert (rc == 0);
296 297 298
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
299 300 301 302

    // set metadata only if there is some
    if (metadata_t* metadata = pending_metadata.front ()) {
        msg_->set_metadata (metadata);
303 304
        // Remove ref corresponding to vector placement
        metadata->drop_ref();
305
    }
306

307 308
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
309
    pending_metadata.pop_front ();
310
    pending_flags.pop_front ();
311
    return 0;
312 313 314 315
}

bool zmq::xpub_t::xhas_in ()
{
316
    return !pending_data.empty ();
317 318 319 320 321 322 323 324
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
325 326
        //  Place the unsubscription to the queue of pending (un)subscriptions
        //  to be retrieved by the user later on.
327 328
        blob_t unsub (size_ + 1);
        *unsub.data() = 0;
Martin Hurton's avatar
Martin Hurton committed
329
        if (size_ > 0)
330 331
            memcpy (unsub.data() + 1, data_, size_);
        self->pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE(unsub));
332
        self->pending_metadata.push_back (NULL);
333
        self->pending_flags.push_back (0);
334 335 336 337 338

        if (self->manual) {
            self->last_pipe = NULL;
            self->pending_pipes.push_back (NULL);
        }
339
    }
340
}