xpub.cpp 9.69 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <string.h>

33 34
#include "xpub.hpp"
#include "pipe.hpp"
35 36
#include "err.hpp"
#include "msg.hpp"
37

38 39
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
40 41
    verbose_subs (false),
    verbose_unsubs (false),
Martin Hurton's avatar
Martin Hurton committed
42
    more (false),
43
    lossy (true),
44
    manual(false),
45
    pending_pipes (),
46
    welcome_msg ()
47
{
48 49 50
    last_pipe = NULL;
    options.type = ZMQ_XPUB;
    welcome_msg.init();
51 52 53 54
}

zmq::xpub_t::~xpub_t ()
{
55
    welcome_msg.close();
56 57
}

58
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59
{
60 61
    zmq_assert (pipe_);
    dist.attach (pipe_);
62

63
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
64
    //  to all data on this pipe, implicitly.
65
    if (subscribe_to_all_)
66
        subscriptions.add (NULL, 0, pipe_);
somdoron's avatar
somdoron committed
67

68 69 70 71 72 73
    // if welcome message exist
    if (welcome_msg.size() > 0)
    {
        msg_t copy;
        copy.init();
        copy.copy(welcome_msg);
somdoron's avatar
somdoron committed
74

75 76 77
        pipe_->write(&copy);
        pipe_->flush();
    }
78

79 80 81
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
82 83 84 85
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
86 87
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
88
    while (pipe_->read (&sub)) {
89 90
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
91
        const size_t size = sub.size ();
92 93 94
        if (size > 0 && (*data == 0 || *data == 1)) {
            if (manual)
            {
95 96 97 98 99 100
                // Store manual subscription to use on termination
                if (*data == 0)
                    manual_subscriptions.rm(data + 1, size - 1, pipe_);
                else
                    manual_subscriptions.add(data + 1, size - 1, pipe_);

101
                pending_pipes.push_back(pipe_);
102
                pending_data.push_back(blob_t(data, size));
103
                pending_metadata.push_back(sub.metadata());
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
                pending_flags.push_back(0);
            }
            else
            {
                bool unique;
                if (*data == 0)
                    unique = subscriptions.rm(data + 1, size - 1, pipe_);
                else
                    unique = subscriptions.add(data + 1, size - 1, pipe_);

                //  If the (un)subscription is not a duplicate store it so that it can be
                //  passed to the user on next recv call unless verbose mode is enabled
                //  which makes to pass always these messages.
                if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
                        (*data == 0 && verbose_unsubs && verbose_subs))) {
                    pending_data.push_back(blob_t(data, size));
120
                    pending_metadata.push_back(sub.metadata());
121 122 123
                    pending_flags.push_back(0);
                }
            }
124
        }
125
        else {
126
            //  Process user message coming upstream from xsub socket
127
            pending_data.push_back (blob_t (data, size));
128
            pending_metadata.push_back (sub.metadata ());
129 130
            pending_flags.push_back (sub.flags ());
        }
131
        sub.close ();
132
    }
133 134
}

135
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
136 137 138 139
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
140 141
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
142
{
143 144 145 146
    if (option_ == ZMQ_XPUB_VERBOSE
     || option_ == ZMQ_XPUB_VERBOSER
     || option_ == ZMQ_XPUB_NODROP
     || option_ == ZMQ_XPUB_MANUAL) {
147 148 149 150
        if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
            errno = EINVAL;
            return -1;
        }
151
        if (option_ == ZMQ_XPUB_VERBOSE) {
152
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
153 154
            verbose_unsubs = 0;
        }
155
        else
156 157 158 159
        if (option_ == ZMQ_XPUB_VERBOSER) {
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
            verbose_unsubs = verbose_subs;
        }
160 161 162 163 164 165 166 167
        else
        if (option_ == ZMQ_XPUB_NODROP)
            lossy = (*static_cast <const int*> (optval_) == 0);
        else
        if (option_ == ZMQ_XPUB_MANUAL)
            manual = (*static_cast <const int*> (optval_) != 0);
    }
    else
168
    if (option_ == ZMQ_SUBSCRIBE && manual) {
169 170
        if (last_pipe != NULL)
            subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
171
    }
172
    else
173
    if (option_ == ZMQ_UNSUBSCRIBE && manual) {
174 175
        if (last_pipe != NULL)
            subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
176
    }
177 178 179 180 181
    else
    if (option_ == ZMQ_XPUB_WELCOME_MSG) {
        welcome_msg.close();

        if (optvallen_ > 0) {
182
            int rc = welcome_msg.init_size(optvallen_);
183
            errno_assert(rc == 0);
184 185 186 187 188 189 190

            unsigned char *data = (unsigned char*)welcome_msg.data();
            memcpy(data, optval_, optvallen_);
        }
        else
            welcome_msg.init();
    }
191 192 193 194
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
195 196 197
    return 0;
}

198
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
199
{
200 201 202 203 204 205 206 207 208 209 210 211 212
    if (manual)
    {
        //  Remove the pipe from the trie and send corresponding manual
        //  unsubscriptions upstream.
        manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
    }
    else
    {
        //  Remove the pipe from the trie. If there are topics that nobody
        //  is interested in anymore, send corresponding unsubscriptions
        //  upstream.
        subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
    }
213

214
    dist.pipe_terminated (pipe_);
215 216
}

217 218 219 220 221 222
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

223
int zmq::xpub_t::xsend (msg_t *msg_)
224
{
225
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
226 227

    //  For the first part of multi-part message, find the matching pipes.
228
    if (!more) {
229 230
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
231 232 233 234 235
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
            dist.reverse_match();
        }
    }
236

237 238 239
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
240
            //  If we are at the end of multi-part message we can mark
241 242 243 244 245 246
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
247
    }
248 249 250
    else
        errno = EAGAIN;
    return rc;
251 252 253 254
}

bool zmq::xpub_t::xhas_out ()
{
255
    return dist.has_out ();
256 257
}

258
int zmq::xpub_t::xrecv (msg_t *msg_)
259
{
Martin Hurton's avatar
Martin Hurton committed
260
    //  If there is at least one
261
    if (pending_data.empty ()) {
262 263 264
        errno = EAGAIN;
        return -1;
    }
265

266
    // User is reading a message, set last_pipe and remove it from the deque
267 268 269 270
    if (manual && !pending_pipes.empty ()) {
        last_pipe = pending_pipes.front ();
        pending_pipes.pop_front ();
    }
271

272 273
    int rc = msg_->close ();
    errno_assert (rc == 0);
274
    rc = msg_->init_size (pending_data.front ().size ());
275
    errno_assert (rc == 0);
276 277 278
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
279 280 281 282 283

    // set metadata only if there is some
    if (metadata_t* metadata = pending_metadata.front ()) {
        msg_->set_metadata (metadata);
    }
284

285 286
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
287
    pending_metadata.pop_front ();
288
    pending_flags.pop_front ();
289
    return 0;
290 291 292 293
}

bool zmq::xpub_t::xhas_in ()
{
294
    return !pending_data.empty ();
295 296 297 298 299 300 301 302
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
303 304
        //  Place the unsubscription to the queue of pending (un)subscriptions
        //  to be retrieved by the user later on.
305 306
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
Martin Hurton's avatar
Martin Hurton committed
307 308
        if (size_ > 0)
            memcpy (&unsub [1], data_, size_);
309
        self->pending_data.push_back (unsub);
310
        self->pending_metadata.push_back (NULL);
311
        self->pending_flags.push_back (0);
312 313 314 315 316

        if (self->manual) {
            self->last_pipe = NULL;
            self->pending_pipes.push_back (NULL);
        }
317
    }
318
}