dish.cpp 8.53 KB
Newer Older
somdoron's avatar
somdoron committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
somdoron's avatar
somdoron committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
somdoron's avatar
somdoron committed
31 32 33 34 35 36 37 38
#include <string.h>

#include "macros.hpp"
#include "dish.hpp"
#include "err.hpp"

zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
    socket_base_t (parent_, tid_, sid_, true),
39
    _has_message (false)
somdoron's avatar
somdoron committed
40 41 42 43 44
{
    options.type = ZMQ_DISH;

    //  When socket is being closed down we don't want to wait till pending
    //  subscription commands are sent to the wire.
45
    options.linger.store (0);
somdoron's avatar
somdoron committed
46

47
    int rc = _message.init ();
somdoron's avatar
somdoron committed
48 49 50 51 52
    errno_assert (rc == 0);
}

zmq::dish_t::~dish_t ()
{
53
    int rc = _message.close ();
somdoron's avatar
somdoron committed
54 55 56
    errno_assert (rc == 0);
}

57 58 59
void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
                                bool subscribe_to_all_,
                                bool locally_initiated_)
somdoron's avatar
somdoron committed
60 61
{
    LIBZMQ_UNUSED (subscribe_to_all_);
62
    LIBZMQ_UNUSED (locally_initiated_);
somdoron's avatar
somdoron committed
63 64

    zmq_assert (pipe_);
65 66
    _fq.attach (pipe_);
    _dist.attach (pipe_);
somdoron's avatar
somdoron committed
67 68 69 70 71 72 73

    //  Send all the cached subscriptions to the new upstream peer.
    send_subscriptions (pipe_);
}

void zmq::dish_t::xread_activated (pipe_t *pipe_)
{
74
    _fq.activated (pipe_);
somdoron's avatar
somdoron committed
75 76 77 78
}

void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
{
79
    _dist.activated (pipe_);
somdoron's avatar
somdoron committed
80 81 82 83
}

void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
{
84 85
    _fq.pipe_terminated (pipe_);
    _dist.pipe_terminated (pipe_);
somdoron's avatar
somdoron committed
86 87 88 89 90 91 92 93
}

void zmq::dish_t::xhiccuped (pipe_t *pipe_)
{
    //  Send all the cached subscriptions to the hiccuped pipe.
    send_subscriptions (pipe_);
}

94
int zmq::dish_t::xjoin (const char *group_)
somdoron's avatar
somdoron committed
95
{
96 97 98
    std::string group = std::string (group_);

    if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
somdoron's avatar
somdoron committed
99 100 101 102 103
        errno = EINVAL;
        return -1;
    }

    //  User cannot join same group twice
104
    if (!_subscriptions.insert (group).second) {
somdoron's avatar
somdoron committed
105 106 107 108 109
        errno = EINVAL;
        return -1;
    }

    msg_t msg;
110
    int rc = msg.init_join ();
somdoron's avatar
somdoron committed
111 112
    errno_assert (rc == 0);

113 114
    rc = msg.set_group (group_);
    errno_assert (rc == 0);
somdoron's avatar
somdoron committed
115 116

    int err = 0;
117
    rc = _dist.send_to_all (&msg);
somdoron's avatar
somdoron committed
118 119 120 121 122 123 124 125 126
    if (rc != 0)
        err = errno;
    int rc2 = msg.close ();
    errno_assert (rc2 == 0);
    if (rc != 0)
        errno = err;
    return rc;
}

127
int zmq::dish_t::xleave (const char *group_)
somdoron's avatar
somdoron committed
128
{
129 130 131
    std::string group = std::string (group_);

    if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
somdoron's avatar
somdoron committed
132 133 134 135
        errno = EINVAL;
        return -1;
    }

136
    if (0 == _subscriptions.erase (group)) {
somdoron's avatar
somdoron committed
137 138 139 140 141
        errno = EINVAL;
        return -1;
    }

    msg_t msg;
142
    int rc = msg.init_leave ();
somdoron's avatar
somdoron committed
143 144
    errno_assert (rc == 0);

145 146
    rc = msg.set_group (group_);
    errno_assert (rc == 0);
somdoron's avatar
somdoron committed
147 148

    int err = 0;
149
    rc = _dist.send_to_all (&msg);
somdoron's avatar
somdoron committed
150 151 152 153 154 155 156 157 158 159 160
    if (rc != 0)
        err = errno;
    int rc2 = msg.close ();
    errno_assert (rc2 == 0);
    if (rc != 0)
        errno = err;
    return rc;
}

int zmq::dish_t::xsend (msg_t *msg_)
{
161
    LIBZMQ_UNUSED (msg_);
somdoron's avatar
somdoron committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175
    errno = ENOTSUP;
    return -1;
}

bool zmq::dish_t::xhas_out ()
{
    //  Subscription can be added/removed anytime.
    return true;
}

int zmq::dish_t::xrecv (msg_t *msg_)
{
    //  If there's already a message prepared by a previous call to zmq_poll,
    //  return it straight ahead.
176
    if (_has_message) {
177
        const int rc = msg_->move (_message);
somdoron's avatar
somdoron committed
178
        errno_assert (rc == 0);
179
        _has_message = false;
somdoron's avatar
somdoron committed
180 181 182
        return 0;
    }

183 184 185 186 187
    return xxrecv (msg_);
}

int zmq::dish_t::xxrecv (msg_t *msg_)
{
188
    do {
189
        //  Get a message using fair queueing algorithm.
190
        const int rc = _fq.recv (msg_);
somdoron's avatar
somdoron committed
191

192 193 194 195 196
        //  If there's no message available, return immediately.
        //  The same when error occurs.
        if (rc != 0)
            return -1;

197 198 199 200 201
        //  Skip non matching messages
    } while (0 == _subscriptions.count (std::string (msg_->group ())));

    //  Found a matching message
    return 0;
somdoron's avatar
somdoron committed
202 203 204 205 206 207
}

bool zmq::dish_t::xhas_in ()
{
    //  If there's already a message prepared by a previous call to zmq_poll,
    //  return straight ahead.
208
    if (_has_message)
somdoron's avatar
somdoron committed
209 210
        return true;

211 212 213 214 215
    const int rc = xxrecv (&_message);
    if (rc != 0) {
        errno_assert (errno == EAGAIN);
        return false;
    }
216 217 218 219

    //  Matching message found
    _has_message = true;
    return true;
somdoron's avatar
somdoron committed
220 221 222 223
}

void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
{
224 225 226
    for (subscriptions_t::iterator it = _subscriptions.begin (),
                                   end = _subscriptions.end ();
         it != end; ++it) {
somdoron's avatar
somdoron committed
227
        msg_t msg;
228 229 230
        int rc = msg.init_join ();
        errno_assert (rc == 0);

231
        rc = msg.set_group (it->c_str ());
somdoron's avatar
somdoron committed
232 233 234
        errno_assert (rc == 0);

        //  Send it to the pipe.
235 236
        pipe_->write (&msg);
        msg.close ();
somdoron's avatar
somdoron committed
237 238 239 240
    }

    pipe_->flush ();
}
241

242 243 244 245 246
zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
                                     bool connect_,
                                     socket_base_t *socket_,
                                     const options_t &options_,
                                     address_t *addr_) :
247
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
248
    _state (group)
249 250 251 252 253 254 255 256 257
{
}

zmq::dish_session_t::~dish_session_t ()
{
}

int zmq::dish_session_t::push_msg (msg_t *msg_)
{
258
    if (_state == group) {
259
        if ((msg_->flags () & msg_t::more) != msg_t::more) {
260 261 262 263
            errno = EFAULT;
            return -1;
        }

264
        if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
265 266 267 268
            errno = EFAULT;
            return -1;
        }

269 270
        _group_msg = *msg_;
        _state = body;
271 272 273 274

        int rc = msg_->init ();
        errno_assert (rc == 0);
        return 0;
275 276 277 278 279 280 281
    }
    const char *group_setting = msg_->group ();
    int rc;
    if (group_setting[0] != 0)
        goto has_group;

    //  Set the message group
282 283
    rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
                          _group_msg.size ());
284
    errno_assert (rc == 0);
285

286
    //  We set the group, so we don't need the group_msg anymore
287
    rc = _group_msg.close ();
288 289 290 291 292 293 294
    errno_assert (rc == 0);
has_group:
    //  Thread safe socket doesn't support multipart messages
    if ((msg_->flags () & msg_t::more) == msg_t::more) {
        errno = EFAULT;
        return -1;
    }
295

296 297
    //  Push message to dish socket
    rc = session_base_t::push_msg (msg_);
298

299
    if (rc == 0)
300
        _state = group;
301

302
    return rc;
303 304
}

305 306 307 308 309 310 311 312 313 314
int zmq::dish_session_t::pull_msg (msg_t *msg_)
{
    int rc = session_base_t::pull_msg (msg_);

    if (rc != 0)
        return rc;

    if (!msg_->is_join () && !msg_->is_leave ())
        return rc;

315
    int group_length = static_cast<int> (strlen (msg_->group ()));
316

317 318
    msg_t command;
    int offset;
319

320 321
    if (msg_->is_join ()) {
        rc = command.init_size (group_length + 5);
322
        errno_assert (rc == 0);
323 324 325 326 327 328 329 330
        offset = 5;
        memcpy (command.data (), "\4JOIN", 5);
    } else {
        rc = command.init_size (group_length + 6);
        errno_assert (rc == 0);
        offset = 6;
        memcpy (command.data (), "\5LEAVE", 6);
    }
331

332 333
    command.set_flags (msg_t::command);
    char *command_data = static_cast<char *> (command.data ());
334

335 336 337 338 339 340 341 342 343 344
    //  Copy the group
    memcpy (command_data + offset, msg_->group (), group_length);

    //  Close the join message
    rc = msg_->close ();
    errno_assert (rc == 0);

    *msg_ = command;

    return 0;
345 346
}

347 348 349
void zmq::dish_session_t::reset ()
{
    session_base_t::reset ();
350
    _state = group;
351
}