xpub.cpp 5.33 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <string.h>

22 23
#include "xpub.hpp"
#include "pipe.hpp"
24 25
#include "err.hpp"
#include "msg.hpp"
26

27 28
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
Pieter Hintjens's avatar
Pieter Hintjens committed
29
    verbose(false),
30
    more (false)
31
{
32
    options.type = ZMQ_XPUB;
33 34 35 36 37 38
}

zmq::xpub_t::~xpub_t ()
{
}

39
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
40
{
41 42
    zmq_assert (pipe_);
    dist.attach (pipe_);
43

44
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
45
    //  to all data on this pipe, implicitly.
46
    if (subscribe_to_all_)
47 48
        subscriptions.add (NULL, 0, pipe_);

49 50 51
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
52 53 54 55
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
56 57
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
58
    while (pipe_->read (&sub)) {
59 60
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
61
        const size_t size = sub.size ();
62 63 64 65 66 67 68 69
        if (size > 0 && (*data == 0 || *data == 1)) {
            bool unique;
            if (*data == 0)
                unique = subscriptions.rm (data + 1, size - 1, pipe_);
            else
                unique = subscriptions.add (data + 1, size - 1, pipe_);

            //  If the subscription is not a duplicate store it so that it can be
John Muehlhausen's avatar
John Muehlhausen committed
70
            //  passed to used on next recv call. (Unsubscribe is not verbose.)
71 72 73 74
            if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
                pending_data.push_back (blob_t (data, size));
                pending_flags.push_back (0);
            }
75
        }
76
        else {
77
            //  Process user message coming upstream from xsub socket
78 79 80
            pending_data.push_back (blob_t (data, size));
            pending_flags.push_back (sub.flags ());
        }
81
        sub.close ();
82
    }
83 84
}

85
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
86 87 88 89
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
90 91 92 93 94 95 96 97 98 99 100
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
{
    if (option_ != ZMQ_XPUB_VERBOSE) {
        errno = EINVAL;
        return -1;
    }
    if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
        errno = EINVAL;
        return -1;
    }
101
    verbose = (*static_cast <const int*> (optval_) != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
102 103 104
    return 0;
}

105
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
106
{
107 108 109 110 111
    //  Remove the pipe from the trie. If there are topics that nobody
    //  is interested in anymore, send corresponding unsubscriptions
    //  upstream.
    subscriptions.rm (pipe_, send_unsubscription, this);

112
    dist.pipe_terminated (pipe_);
113 114
}

115 116 117 118 119 120
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

121
int zmq::xpub_t::xsend (msg_t *msg_)
122
{
123
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
124 125 126 127 128

    //  For the first part of multi-part message, find the matching pipes.
    if (!more)
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
129 130 131

    //  Send the message to all the pipes that were marked as matching
    //  in the previous step.
132
    int rc = dist.send_to_matching (msg_);
133 134 135 136 137 138 139 140 141 142 143
    if (rc != 0)
        return rc;

    //  If we are at the end of multi-part message we can mark all the pipes
    //  as non-matching.
    if (!msg_more)
        dist.unmatch ();

    more = msg_more;

    return 0;
144 145 146 147
}

bool zmq::xpub_t::xhas_out ()
{
148
    return dist.has_out ();
149 150
}

151
int zmq::xpub_t::xrecv (msg_t *msg_)
152
{
153
    //  If there is at least one 
154
    if (pending_data.empty ()) {
155 156 157
        errno = EAGAIN;
        return -1;
    }
158 159 160

    int rc = msg_->close ();
    errno_assert (rc == 0);
161
    rc = msg_->init_size (pending_data.front ().size ());
162
    errno_assert (rc == 0);
163 164 165 166 167 168
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
    pending_flags.pop_front ();
169
    return 0;
170 171 172 173
}

bool zmq::xpub_t::xhas_in ()
{
174
    return !pending_data.empty ();
175 176 177 178 179 180 181 182
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
183 184 185 186 187
        //  Place the unsubscription to the queue of pending (un)sunscriptions
        //  to be retrived by the user later on.
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
        memcpy (&unsub [1], data_, size_);
188 189
        self->pending_data.push_back (unsub);
        self->pending_flags.push_back (0);
190
    }
191
}