xpub.cpp 5.56 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <string.h>

22 23
#include "xpub.hpp"
#include "pipe.hpp"
24 25
#include "err.hpp"
#include "msg.hpp"
26

27 28
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
Martin Hurton's avatar
Martin Hurton committed
29 30 31
    verbose (false),
    more (false),
    lossy (true)
32
{
33
    options.type = ZMQ_XPUB;
34 35 36 37 38 39
}

zmq::xpub_t::~xpub_t ()
{
}

40
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
41
{
42 43
    zmq_assert (pipe_);
    dist.attach (pipe_);
44

45
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
46
    //  to all data on this pipe, implicitly.
47
    if (subscribe_to_all_)
48 49
        subscriptions.add (NULL, 0, pipe_);

50 51 52
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
53 54 55 56
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
57 58
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
59
    while (pipe_->read (&sub)) {
60 61
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
62
        const size_t size = sub.size ();
63 64 65 66 67 68 69 70
        if (size > 0 && (*data == 0 || *data == 1)) {
            bool unique;
            if (*data == 0)
                unique = subscriptions.rm (data + 1, size - 1, pipe_);
            else
                unique = subscriptions.add (data + 1, size - 1, pipe_);

            //  If the subscription is not a duplicate store it so that it can be
John Muehlhausen's avatar
John Muehlhausen committed
71
            //  passed to used on next recv call. (Unsubscribe is not verbose.)
72 73 74 75
            if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
                pending_data.push_back (blob_t (data, size));
                pending_flags.push_back (0);
            }
76
        }
77
        else {
78
            //  Process user message coming upstream from xsub socket
79 80 81
            pending_data.push_back (blob_t (data, size));
            pending_flags.push_back (sub.flags ());
        }
82
        sub.close ();
83
    }
84 85
}

86
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
87 88 89 90
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
91 92 93 94 95 96 97
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
{
    if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
        errno = EINVAL;
        return -1;
    }
Martin Hurton's avatar
Martin Hurton committed
98
    if (option_ == ZMQ_XPUB_VERBOSE)
99
        verbose = (*static_cast <const int*> (optval_) != 0);
Martin Hurton's avatar
Martin Hurton committed
100
    else
101 102 103 104 105 106
    if (option_ == ZMQ_XPUB_NODROP)
        lossy = (*static_cast <const int*> (optval_) == 0);
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
107 108 109
    return 0;
}

110
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
111
{
112 113 114 115 116
    //  Remove the pipe from the trie. If there are topics that nobody
    //  is interested in anymore, send corresponding unsubscriptions
    //  upstream.
    subscriptions.rm (pipe_, send_unsubscription, this);

117
    dist.pipe_terminated (pipe_);
118 119
}

120 121 122 123 124 125
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

126
int zmq::xpub_t::xsend (msg_t *msg_)
127
{
128
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
129 130 131 132 133

    //  For the first part of multi-part message, find the matching pipes.
    if (!more)
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
134

135 136 137 138 139 140 141 142 143 144
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
            //  If we are at the end of multi-part message we can mark 
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
145
    }
146 147 148
    else
        errno = EAGAIN;
    return rc;
149 150 151 152
}

bool zmq::xpub_t::xhas_out ()
{
153
    return dist.has_out ();
154 155
}

156
int zmq::xpub_t::xrecv (msg_t *msg_)
157
{
Martin Hurton's avatar
Martin Hurton committed
158
    //  If there is at least one
159
    if (pending_data.empty ()) {
160 161 162
        errno = EAGAIN;
        return -1;
    }
163 164 165

    int rc = msg_->close ();
    errno_assert (rc == 0);
166
    rc = msg_->init_size (pending_data.front ().size ());
167
    errno_assert (rc == 0);
168 169 170 171 172 173
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
    pending_flags.pop_front ();
174
    return 0;
175 176 177 178
}

bool zmq::xpub_t::xhas_in ()
{
179
    return !pending_data.empty ();
180 181 182 183 184 185 186 187
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
188 189 190 191 192
        //  Place the unsubscription to the queue of pending (un)sunscriptions
        //  to be retrived by the user later on.
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
        memcpy (&unsub [1], data_, size_);
193 194
        self->pending_data.push_back (unsub);
        self->pending_flags.push_back (0);
195
    }
196
}