xpub.cpp 6.77 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <string.h>

22 23
#include "xpub.hpp"
#include "pipe.hpp"
24 25
#include "err.hpp"
#include "msg.hpp"
26

27 28
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_),
Martin Hurton's avatar
Martin Hurton committed
29 30
    verbose (false),
    more (false),
31
    lossy (true),
somdoron's avatar
somdoron committed
32 33
	manual(false),
	welcome_msg ()
34
{
somdoron's avatar
somdoron committed
35
	last_pipe = NULL;	
36
    options.type = ZMQ_XPUB;	
somdoron's avatar
somdoron committed
37
	welcome_msg.init();
38 39 40 41
}

zmq::xpub_t::~xpub_t ()
{
somdoron's avatar
somdoron committed
42
	welcome_msg.close();
43 44
}

45
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
46
{
47 48
    zmq_assert (pipe_);
    dist.attach (pipe_);
somdoron's avatar
somdoron committed
49
	
50
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
51
    //  to all data on this pipe, implicitly.
52
    if (subscribe_to_all_)
somdoron's avatar
somdoron committed
53 54 55 56 57 58 59 60 61 62 63 64
        subscriptions.add (NULL, 0, pipe_);	

	// if welcome message exist
	if (welcome_msg.size() > 0)
	{
		msg_t copy;
		copy.init();
		copy.copy(welcome_msg);

		pipe_->write(&copy);		
		pipe_->flush();		
	}
65

66 67 68
    //  The pipe is active when attached. Let's read the subscriptions from
    //  it, if any.
    xread_activated (pipe_);
69 70 71 72
}

void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
73 74
    //  There are some subscriptions waiting. Let's process them.
    msg_t sub;
75
    while (pipe_->read (&sub)) {
76 77
        //  Apply the subscription to the trie
        unsigned char *const data = (unsigned char *) sub.data ();
78
        const size_t size = sub.size ();
somdoron's avatar
somdoron committed
79
        if (size > 0 && (*data == 0 || *data == 1)) {			
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
			if (manual)
			{
				last_pipe = pipe_;
				pending_data.push_back(blob_t(data, size));
				pending_flags.push_back(0);				
			}
			else
			{
				bool unique;
				if (*data == 0)
					unique = subscriptions.rm(data + 1, size - 1, pipe_);
				else
					unique = subscriptions.add(data + 1, size - 1, pipe_);

				//  If the subscription is not a duplicate store it so that it can be
				//  passed to used on next recv call. (Unsubscribe is not verbose.)
				if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
					pending_data.push_back(blob_t(data, size));
					pending_flags.push_back(0);
				}
			}
101
        }
102
        else {
103
            //  Process user message coming upstream from xsub socket
104 105 106
            pending_data.push_back (blob_t (data, size));
            pending_flags.push_back (sub.flags ());
        }
107
        sub.close ();
108
    }
109 110
}

111
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
112 113 114 115
{
    dist.activated (pipe_);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
116 117
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
    size_t optvallen_)
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
{	
	if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
	{
		if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
			errno = EINVAL;
			return -1;
		}

		if (option_ == ZMQ_XPUB_VERBOSE)
			verbose = (*static_cast <const int*> (optval_) != 0);
		else
		if (option_ == ZMQ_XPUB_NODROP)
			lossy = (*static_cast <const int*> (optval_) == 0);
		else
		if (option_ == ZMQ_XPUB_MANUAL)
			manual = (*static_cast <const int*> (optval_) != 0);				
	}        
    else    
somdoron's avatar
somdoron committed
136 137
	if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL)	
		subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe);	
138
	else
somdoron's avatar
somdoron committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152
	if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL)	
		subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe);	
	else 
	if (option_ == ZMQ_XPUB_WELCOME_MSG) {	
		welcome_msg.close();

		if (optvallen_ > 0)	{
			welcome_msg.init_size(optvallen_);

			unsigned char *data = (unsigned char*)welcome_msg.data();
			memcpy(data, optval_, optvallen_);		
		}
		else
			welcome_msg.init();
153
	}
154 155 156 157
    else {
        errno = EINVAL;
        return -1;
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
158 159 160
    return 0;
}

161
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
162
{
163 164 165 166 167
    //  Remove the pipe from the trie. If there are topics that nobody
    //  is interested in anymore, send corresponding unsubscriptions
    //  upstream.
    subscriptions.rm (pipe_, send_unsubscription, this);

168
    dist.pipe_terminated (pipe_);
169 170
}

171 172 173 174 175 176
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;
    self->dist.match (pipe_);
}

177
int zmq::xpub_t::xsend (msg_t *msg_)
178
{
179
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
180 181

    //  For the first part of multi-part message, find the matching pipes.
182
    if (!more) {
183 184
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
            mark_as_matching, this);
185 186 187 188 189
        // If inverted matching is used, reverse the selection now
        if (options.invert_matching) {
            dist.reverse_match();
        }
    }
190

191 192 193 194 195 196 197 198 199 200
    int rc = -1;            //  Assume we fail
    if (lossy || dist.check_hwm ()) {
        if (dist.send_to_matching (msg_) == 0) {
            //  If we are at the end of multi-part message we can mark 
            //  all the pipes as non-matching.
            if (!msg_more)
                dist.unmatch ();
            more = msg_more;
            rc = 0;         //  Yay, sent successfully
        }
201
    }
202 203 204
    else
        errno = EAGAIN;
    return rc;
205 206 207 208
}

bool zmq::xpub_t::xhas_out ()
{
209
    return dist.has_out ();
210 211
}

212
int zmq::xpub_t::xrecv (msg_t *msg_)
213
{
Martin Hurton's avatar
Martin Hurton committed
214
    //  If there is at least one
215
    if (pending_data.empty ()) {
216 217 218
        errno = EAGAIN;
        return -1;
    }
219 220 221

    int rc = msg_->close ();
    errno_assert (rc == 0);
222
    rc = msg_->init_size (pending_data.front ().size ());
223
    errno_assert (rc == 0);
224 225 226 227 228 229
    memcpy (msg_->data (),
        pending_data.front ().data (),
        pending_data.front ().size ());
    msg_->set_flags (pending_flags.front ());
    pending_data.pop_front ();
    pending_flags.pop_front ();
230
    return 0;
231 232 233 234
}

bool zmq::xpub_t::xhas_in ()
{
235
    return !pending_data.empty ();
236 237 238 239 240 241 242 243
}

void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
    void *arg_)
{
    xpub_t *self = (xpub_t*) arg_;

    if (self->options.type != ZMQ_PUB) {
244 245 246 247
        //  Place the unsubscription to the queue of pending (un)sunscriptions
        //  to be retrived by the user later on.
        blob_t unsub (size_ + 1, 0);
        unsub [0] = 0;
Martin Hurton's avatar
Martin Hurton committed
248 249
        if (size_ > 0)
            memcpy (&unsub [1], data_, size_);
250 251
        self->pending_data.push_back (unsub);
        self->pending_flags.push_back (0);
252
    }
253
}