xpub.cpp 9.19 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <string.h>

33 34
#include "xpub.hpp"
#include "pipe.hpp"
35 36
#include "err.hpp"
#include "msg.hpp"
37

38 39
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
40 41
    verbose_subs (false),
    verbose_unsubs (false),
Martin Hurton's avatar
Martin Hurton committed
42
    more (false),
43
    lossy (true),
44
    manual(false),
45
    pending_pipes (),
46
    welcome_msg ()
47
{
48 49 50
    last_pipe = NULL;
    options.type = ZMQ_XPUB;
    welcome_msg.init();
51 52 53 54
}

zmq::xpub_t::~xpub_t ()
{
55
    welcome_msg.close();
56 57
}

58
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59
{
60 61
    zmq_assert (pipe_);
    dist.attach (pipe_);
62

63
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
64
    //  to all data on this pipe, implicitly.
65
    if (subscribe_to_all_)
66
        subscriptions.add (NULL, 0, pipe_);
somdoron's avatar
somdoron committed
67

68 69 70 71 72 73
    // if welcome message exist
    if (welcome_msg.size() > 0)
    {
        msg_t copy;
        copy.init();
        copy.copy(welcome_msg);
somdoron's avatar
somdoron committed
74

75 76 77
        pipe_->write(&copy);
        pipe_->flush();
    }
78

79 80 81
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
82 83 84 85
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
86 87
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
88
    while (pipe_->read (&sub)) {
89 90
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
91
        const size_t size = sub.size ();
92 93 94
        if (size > 0 && (*data == 0 || *data == 1)) {
            if (manual)
            {
95
                pending_pipes.push_back(pipe_);
96
                pending_data.push_back(blob_t(data, size));
97
                pending_metadata.push_back(sub.metadata());
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
                pending_flags.push_back(0);
            }
            else
            {
                bool unique;
                if (*data == 0)
                    unique = subscriptions.rm(data + 1, size - 1, pipe_);
                else
                    unique = subscriptions.add(data + 1, size - 1, pipe_);

                //  If the (un)subscription is not a duplicate store it so that it can be
                //  passed to the user on next recv call unless verbose mode is enabled
                //  which makes to pass always these messages.
                if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
                        (*data == 0 && verbose_unsubs && verbose_subs))) {
                    pending_data.push_back(blob_t(data, size));
114
                    pending_metadata.push_back(sub.metadata());
115 116 117
                    pending_flags.push_back(0);
                }
            }
118
        }
119
        else {
120
            //  Process user message coming upstream from xsub socket
121
            pending_data.push_back (blob_t (data, size));
122
            pending_metadata.push_back (sub.metadata ());
123 124
            pending_flags.push_back (sub.flags ());
        }
125
        sub.close ();
126
    }
127 128
}

129
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
130 131 132 133
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
134 135
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
136
{
137 138 139 140
    if (option_ == ZMQ_XPUB_VERBOSE
     || option_ == ZMQ_XPUB_VERBOSER
     || option_ == ZMQ_XPUB_NODROP
     || option_ == ZMQ_XPUB_MANUAL) {
141 142 143 144
        if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
            errno = EINVAL;
            return -1;
        }
145
        if (option_ == ZMQ_XPUB_VERBOSE) {
146
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
147 148
            verbose_unsubs = 0;
        }
149
        else
150 151 152 153
        if (option_ == ZMQ_XPUB_VERBOSER) {
            verbose_subs = (*static_cast <const int*> (optval_) != 0);
            verbose_unsubs = verbose_subs;
        }
154 155 156 157 158 159 160 161
        else
        if (option_ == ZMQ_XPUB_NODROP)
            lossy = (*static_cast <const int*> (optval_) == 0);
        else
        if (option_ == ZMQ_XPUB_MANUAL)
            manual = (*static_cast <const int*> (optval_) != 0);
    }
    else
162
    if (option_ == ZMQ_SUBSCRIBE && manual) {
163 164
        if (last_pipe != NULL)
            subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
165
    }
166
    else
167
    if (option_ == ZMQ_UNSUBSCRIBE && manual) {
168 169
        if (last_pipe != NULL)
            subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
170
    }
171 172 173 174 175
    else
    if (option_ == ZMQ_XPUB_WELCOME_MSG) {
        welcome_msg.close();

        if (optvallen_ > 0) {
176
            int rc = welcome_msg.init_size(optvallen_);
177
            errno_assert(rc == 0);
178 179 180 181 182 183 184

            unsigned char *data = (unsigned char*)welcome_msg.data();
            memcpy(data, optval_, optvallen_);
        }
        else
            welcome_msg.init();
    }
185 186 187 188
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
189 190 191
    return 0;
}

192
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
193
{
194 195 196
    //  Remove the pipe from the trie. If there are topics that nobody
    //  is interested in anymore, send corresponding unsubscriptions
    //  upstream.
197
    subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual));
198

199
    dist.pipe_terminated (pipe_);
200 201
}

202 203 204 205 206 207
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

208
int zmq::xpub_t::xsend (msg_t *msg_)
209
{
210
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
211 212

    //  For the first part of multi-part message, find the matching pipes.
213
    if (!more) {
214 215
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
216 217 218 219 220
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
            dist.reverse_match();
        }
    }
221

222 223 224
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
225
            //  If we are at the end of multi-part message we can mark
226 227 228 229 230 231
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
232
    }
233 234 235
    else
        errno = EAGAIN;
    return rc;
236 237 238 239
}

bool zmq::xpub_t::xhas_out ()
{
240
    return dist.has_out ();
241 242
}

243
int zmq::xpub_t::xrecv (msg_t *msg_)
244
{
Martin Hurton's avatar
Martin Hurton committed
245
    //  If there is at least one
246
    if (pending_data.empty ()) {
247 248 249
        errno = EAGAIN;
        return -1;
    }
250

251
    // User is reading a message, set last_pipe and remove it from the deque
252 253 254 255
    if (manual && !pending_pipes.empty ()) {
        last_pipe = pending_pipes.front ();
        pending_pipes.pop_front ();
    }
256

257 258
    int rc = msg_->close ();
    errno_assert (rc == 0);
259
    rc = msg_->init_size (pending_data.front ().size ());
260
    errno_assert (rc == 0);
261 262 263
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
264 265 266 267 268

    // set metadata only if there is some
    if (metadata_t* metadata = pending_metadata.front ()) {
        msg_->set_metadata (metadata);
    }
269

270 271
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
272
    pending_metadata.pop_front ();
273
    pending_flags.pop_front ();
274
    return 0;
275 276 277 278
}

bool zmq::xpub_t::xhas_in ()
{
279
    return !pending_data.empty ();
280 281 282 283 284 285 286 287
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
288 289
        //  Place the unsubscription to the queue of pending (un)subscriptions
        //  to be retrieved by the user later on.
290 291
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
Martin Hurton's avatar
Martin Hurton committed
292 293
        if (size_ > 0)
            memcpy (&unsub [1], data_, size_);
294
        self->pending_data.push_back (unsub);
295
        self->pending_metadata.push_back (NULL);
296
        self->pending_flags.push_back (0);
297 298 299 300 301

        if (self->manual) {
            self->last_pipe = NULL;
            self->pending_pipes.push_back (NULL);
        }
302
    }
303
}