xpub.cpp 10 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <string.h>

33 34
#include "xpub.hpp"
#include "pipe.hpp"
35 36
#include "err.hpp"
#include "msg.hpp"
37

38 39
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
40 41
    verbose_subs (false),
    verbose_unsubs (false),
Martin Hurton's avatar
Martin Hurton committed
42
    more (false),
43
    lossy (true),
44
    manual(false),
45
    pending_pipes (),
46
    welcome_msg ()
47
{
48 49 50
    last_pipe = NULL;
    options.type = ZMQ_XPUB;
    welcome_msg.init();
51 52 53 54
}

zmq::xpub_t::~xpub_t ()
{
55
    welcome_msg.close();
56 57
}

58
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59
{
60 61
    zmq_assert (pipe_);
    dist.attach (pipe_);
62

63
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
64
    //  to all data on this pipe, implicitly.
65
    if (subscribe_to_all_)
66
        subscriptions.add (NULL, 0, pipe_);
somdoron's avatar
somdoron committed
67

68 69 70 71 72 73
    // if welcome message exist
    if (welcome_msg.size() > 0)
    {
        msg_t copy;
        copy.init();
        copy.copy(welcome_msg);
somdoron's avatar
somdoron committed
74

75 76 77
        pipe_->write(&copy);
        pipe_->flush();
    }
78

79 80 81
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
82 83 84 85
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
86 87
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
88
    while (pipe_->read (&sub)) {
89 90
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
91
        const size_t size = sub.size ();
92
        metadata_t* metadata = sub.metadata();
93 94 95
        if (size > 0 && (*data == 0 || *data == 1)) {
            if (manual)
            {
96 97 98 99 100 101
                // Store manual subscription to use on termination
                if (*data == 0)
                    manual_subscriptions.rm(data + 1, size - 1, pipe_);
                else
                    manual_subscriptions.add(data + 1, size - 1, pipe_);

102
                pending_pipes.push_back(pipe_);
103
                pending_data.push_back(blob_t(data, size));
104 105 106
                if (metadata)
                    metadata->add_ref();
                pending_metadata.push_back(metadata);
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
                pending_flags.push_back(0);
            }
            else
            {
                bool unique;
                if (*data == 0)
                    unique = subscriptions.rm(data + 1, size - 1, pipe_);
                else
                    unique = subscriptions.add(data + 1, size - 1, pipe_);

                //  If the (un)subscription is not a duplicate store it so that it can be
                //  passed to the user on next recv call unless verbose mode is enabled
                //  which makes to pass always these messages.
                if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
                        (*data == 0 && verbose_unsubs && verbose_subs))) {
                    pending_data.push_back(blob_t(data, size));
123 124 125
                    if (metadata)
                        metadata->add_ref();
                    pending_metadata.push_back(metadata);
126 127 128
                    pending_flags.push_back(0);
                }
            }
129
        }
130
        else {
131
            //  Process user message coming upstream from xsub socket
132
            pending_data.push_back (blob_t (data, size));
133 134 135
            if (metadata)
                metadata->add_ref();
            pending_metadata.push_back (metadata);
136 137
            pending_flags.push_back (sub.flags ());
        }
138
        sub.close ();
139
    }
140 141
}

142
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
143 144 145 146
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
147 148
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
149
{
150 151 152 153
    if (option_ == ZMQ_XPUB_VERBOSE
     || option_ == ZMQ_XPUB_VERBOSER
     || option_ == ZMQ_XPUB_NODROP
     || option_ == ZMQ_XPUB_MANUAL) {
154 155 156 157
        if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
            errno = EINVAL;
            return -1;
        }
158
        if (option_ == ZMQ_XPUB_VERBOSE) {
159
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
160 161
            verbose_unsubs = 0;
        }
162
        else
163 164 165 166
        if (option_ == ZMQ_XPUB_VERBOSER) {
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
            verbose_unsubs = verbose_subs;
        }
167 168 169 170 171 172 173 174
        else
        if (option_ == ZMQ_XPUB_NODROP)
            lossy = (*static_cast <const int*> (optval_) == 0);
        else
        if (option_ == ZMQ_XPUB_MANUAL)
            manual = (*static_cast <const int*> (optval_) != 0);
    }
    else
175
    if (option_ == ZMQ_SUBSCRIBE && manual) {
176 177
        if (last_pipe != NULL)
            subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
178
    }
179
    else
180
    if (option_ == ZMQ_UNSUBSCRIBE && manual) {
181 182
        if (last_pipe != NULL)
            subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
183
    }
184 185 186 187 188
    else
    if (option_ == ZMQ_XPUB_WELCOME_MSG) {
        welcome_msg.close();

        if (optvallen_ > 0) {
189
            int rc = welcome_msg.init_size(optvallen_);
190
            errno_assert(rc == 0);
191 192 193 194 195 196 197

            unsigned char *data = (unsigned char*)welcome_msg.data();
            memcpy(data, optval_, optvallen_);
        }
        else
            welcome_msg.init();
    }
198 199 200 201
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
202 203 204
    return 0;
}

205
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
206
{
207 208 209 210 211 212 213 214 215 216 217 218 219
    if (manual)
    {
        //  Remove the pipe from the trie and send corresponding manual
        //  unsubscriptions upstream.
        manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
    }
    else
    {
        //  Remove the pipe from the trie. If there are topics that nobody
        //  is interested in anymore, send corresponding unsubscriptions
        //  upstream.
        subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
    }
220

221
    dist.pipe_terminated (pipe_);
222 223
}

224 225 226 227 228 229
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

230
int zmq::xpub_t::xsend (msg_t *msg_)
231
{
232
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
233 234

    //  For the first part of multi-part message, find the matching pipes.
235
    if (!more) {
236 237
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
238 239 240 241 242
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
            dist.reverse_match();
        }
    }
243

244 245 246
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
247
            //  If we are at the end of multi-part message we can mark
248 249 250 251 252 253
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
254
    }
255 256 257
    else
        errno = EAGAIN;
    return rc;
258 259 260 261
}

bool zmq::xpub_t::xhas_out ()
{
262
    return dist.has_out ();
263 264
}

265
int zmq::xpub_t::xrecv (msg_t *msg_)
266
{
Martin Hurton's avatar
Martin Hurton committed
267
    //  If there is at least one
268
    if (pending_data.empty ()) {
269 270 271
        errno = EAGAIN;
        return -1;
    }
272

273
    // User is reading a message, set last_pipe and remove it from the deque
274 275 276 277
    if (manual && !pending_pipes.empty ()) {
        last_pipe = pending_pipes.front ();
        pending_pipes.pop_front ();
    }
278

279 280
    int rc = msg_->close ();
    errno_assert (rc == 0);
281
    rc = msg_->init_size (pending_data.front ().size ());
282
    errno_assert (rc == 0);
283 284 285
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
286 287 288 289

    // set metadata only if there is some
    if (metadata_t* metadata = pending_metadata.front ()) {
        msg_->set_metadata (metadata);
290 291
        // Remove ref corresponding to vector placement
        metadata->drop_ref();
292
    }
293

294 295
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
296
    pending_metadata.pop_front ();
297
    pending_flags.pop_front ();
298
    return 0;
299 300 301 302
}

bool zmq::xpub_t::xhas_in ()
{
303
    return !pending_data.empty ();
304 305 306 307 308 309 310 311
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
312 313
        //  Place the unsubscription to the queue of pending (un)subscriptions
        //  to be retrieved by the user later on.
314 315
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
Martin Hurton's avatar
Martin Hurton committed
316 317
        if (size_ > 0)
            memcpy (&unsub [1], data_, size_);
318
        self->pending_data.push_back (unsub);
319
        self->pending_metadata.push_back (NULL);
320
        self->pending_flags.push_back (0);
321 322 323 324 325

        if (self->manual) {
            self->last_pipe = NULL;
            self->pending_pipes.push_back (NULL);
        }
326
    }
327
}