msg.cpp 15.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33 34 35 36 37 38
#include "msg.hpp"

#include <string.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
39
#include "likely.hpp"
40
#include "metadata.hpp"
41 42
#include "err.hpp"

43
//  Check whether the sizes of public representation of the message (zmq_msg_t)
44
//  and private representation of the message (zmq::msg_t) match.
45

46 47 48
typedef char
  zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0)
                     - 1];
49

50
bool zmq::msg_t::check () const
51
{
52
    return _u.base.type >= type_min && _u.base.type <= type_max;
53 54
}

55 56 57 58 59
int zmq::msg_t::init (void *data_,
                      size_t size_,
                      msg_free_fn *ffn_,
                      void *hint_,
                      content_t *content_)
60
{
61
    if (size_ < max_vsm_size) {
62
        const int rc = init_size (size_);
63

64 65
        if (rc != -1) {
            memcpy (data (), data_, size_);
66 67
            return 0;
        }
68
        return -1;
69 70
    }
    if (content_) {
71
        return init_external_storage (content_, data_, size_, ffn_, hint_);
72
    }
73
    return init_data (data_, size_, ffn_, hint_);
74 75
}

76 77
int zmq::msg_t::init ()
{
78 79 80 81 82 83
    _u.vsm.metadata = NULL;
    _u.vsm.type = type_vsm;
    _u.vsm.flags = 0;
    _u.vsm.size = 0;
    _u.vsm.group[0] = '\0';
    _u.vsm.routing_id = 0;
84 85 86
    return 0;
}

87
int zmq::msg_t::init_size (size_t size_)
88
{
89
    if (size_ <= max_vsm_size) {
90 91 92 93 94 95
        _u.vsm.metadata = NULL;
        _u.vsm.type = type_vsm;
        _u.vsm.flags = 0;
        _u.vsm.size = static_cast<unsigned char> (size_);
        _u.vsm.group[0] = '\0';
        _u.vsm.routing_id = 0;
96
    } else {
97 98 99 100 101 102
        _u.lmsg.metadata = NULL;
        _u.lmsg.type = type_lmsg;
        _u.lmsg.flags = 0;
        _u.lmsg.group[0] = '\0';
        _u.lmsg.routing_id = 0;
        _u.lmsg.content = NULL;
103
        if (sizeof (content_t) + size_ > size_)
104
            _u.lmsg.content =
105
              static_cast<content_t *> (malloc (sizeof (content_t) + size_));
106
        if (unlikely (!_u.lmsg.content)) {
107 108 109
            errno = ENOMEM;
            return -1;
        }
110

111 112 113 114 115
        _u.lmsg.content->data = _u.lmsg.content + 1;
        _u.lmsg.content->size = size_;
        _u.lmsg.content->ffn = NULL;
        _u.lmsg.content->hint = NULL;
        new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
116 117 118 119
    }
    return 0;
}

120 121 122 123 124
int zmq::msg_t::init_external_storage (content_t *content_,
                                       void *data_,
                                       size_t size_,
                                       msg_free_fn *ffn_,
                                       void *hint_)
125
{
126 127
    zmq_assert (NULL != data_);
    zmq_assert (NULL != content_);
128

129 130 131 132 133
    _u.zclmsg.metadata = NULL;
    _u.zclmsg.type = type_zclmsg;
    _u.zclmsg.flags = 0;
    _u.zclmsg.group[0] = '\0';
    _u.zclmsg.routing_id = 0;
134

135 136 137 138 139 140
    _u.zclmsg.content = content_;
    _u.zclmsg.content->data = data_;
    _u.zclmsg.content->size = size_;
    _u.zclmsg.content->ffn = ffn_;
    _u.zclmsg.content->hint = hint_;
    new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t ();
141 142 143 144

    return 0;
}

145 146 147 148
int zmq::msg_t::init_data (void *data_,
                           size_t size_,
                           msg_free_fn *ffn_,
                           void *hint_)
149
{
150 151
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
152
    zmq_assert (data_ != NULL || size_ == 0);
153

154
    //  Initialize constant message if there's no need to deallocate
155
    if (ffn_ == NULL) {
156 157 158 159 160 161 162
        _u.cmsg.metadata = NULL;
        _u.cmsg.type = type_cmsg;
        _u.cmsg.flags = 0;
        _u.cmsg.data = data_;
        _u.cmsg.size = size_;
        _u.cmsg.group[0] = '\0';
        _u.cmsg.routing_id = 0;
163
    } else {
164 165 166 167 168 169 170 171
        _u.lmsg.metadata = NULL;
        _u.lmsg.type = type_lmsg;
        _u.lmsg.flags = 0;
        _u.lmsg.group[0] = '\0';
        _u.lmsg.routing_id = 0;
        _u.lmsg.content =
          static_cast<content_t *> (malloc (sizeof (content_t)));
        if (!_u.lmsg.content) {
172 173 174
            errno = ENOMEM;
            return -1;
        }
175

176 177 178 179 180
        _u.lmsg.content->data = data_;
        _u.lmsg.content->size = size_;
        _u.lmsg.content->ffn = ffn_;
        _u.lmsg.content->hint = hint_;
        new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
181
    }
182 183 184
    return 0;
}

185
int zmq::msg_t::init_delimiter ()
186
{
187 188 189 190 191
    _u.delimiter.metadata = NULL;
    _u.delimiter.type = type_delimiter;
    _u.delimiter.flags = 0;
    _u.delimiter.group[0] = '\0';
    _u.delimiter.routing_id = 0;
192 193 194
    return 0;
}

195 196
int zmq::msg_t::init_join ()
{
197 198 199 200 201
    _u.base.metadata = NULL;
    _u.base.type = type_join;
    _u.base.flags = 0;
    _u.base.group[0] = '\0';
    _u.base.routing_id = 0;
202 203 204 205 206
    return 0;
}

int zmq::msg_t::init_leave ()
{
207 208 209 210 211
    _u.base.metadata = NULL;
    _u.base.type = type_leave;
    _u.base.flags = 0;
    _u.base.group[0] = '\0';
    _u.base.routing_id = 0;
212 213 214
    return 0;
}

215 216 217 218
int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
219 220 221 222
        errno = EFAULT;
        return -1;
    }

223
    if (_u.base.type == type_lmsg) {
224
        //  If the content is not shared, or if it is shared and the reference
225
        //  count has dropped to zero, deallocate it.
226 227
        if (!(_u.lmsg.flags & msg_t::shared)
            || !_u.lmsg.content->refcnt.sub (1)) {
228 229
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
230
            _u.lmsg.content->refcnt.~atomic_counter_t ();
231

232 233 234 235
            if (_u.lmsg.content->ffn)
                _u.lmsg.content->ffn (_u.lmsg.content->data,
                                      _u.lmsg.content->hint);
            free (_u.lmsg.content);
236 237 238
        }
    }

239
    if (is_zcmsg ()) {
240
        zmq_assert (_u.zclmsg.content->ffn);
241 242 243

        //  If the content is not shared, or if it is shared and the reference
        //  count has dropped to zero, deallocate it.
244 245
        if (!(_u.zclmsg.flags & msg_t::shared)
            || !_u.zclmsg.content->refcnt.sub (1)) {
246 247
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
248
            _u.zclmsg.content->refcnt.~atomic_counter_t ();
249

250 251
            _u.zclmsg.content->ffn (_u.zclmsg.content->data,
                                    _u.zclmsg.content->hint);
252 253 254
        }
    }

255 256 257
    if (_u.base.metadata != NULL) {
        if (_u.base.metadata->drop_ref ()) {
            LIBZMQ_DELETE (_u.base.metadata);
258
        }
259
        _u.base.metadata = NULL;
260
    }
261

262
    //  Make the message invalid.
263
    _u.base.type = 0;
264 265 266 267

    return 0;
}

268
int zmq::msg_t::move (msg_t &src_)
269
{
270 271
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
272 273 274 275
        errno = EFAULT;
        return -1;
    }

276 277 278 279 280 281 282 283 284 285
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

286 287 288
    return 0;
}

289
int zmq::msg_t::copy (msg_t &src_)
290
{
291 292
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
293 294 295 296
        errno = EFAULT;
        return -1;
    }

297 298 299
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
300

301 302 303
    // The initial reference count, when a non-shared message is initially
    // shared (between the original and the copy we create here).
    const atomic_counter_t::integer_t initial_shared_refcnt = 2;
304

305
    if (src_.is_lmsg () || src_.is_zcmsg ()) {
306
        //  One reference is added to shared messages. Non-shared messages
307 308
        //  are turned into shared messages.
        if (src_.flags () & msg_t::shared)
309
            src_.refcnt ()->add (1);
310
        else {
311 312
            src_.set_flags (msg_t::shared);
            src_.refcnt ()->set (initial_shared_refcnt);
313 314
        }
    }
315

316 317
    if (src_._u.base.metadata != NULL)
        src_._u.base.metadata->add_ref ();
318

319 320
    *this = src_;

321
    return 0;
322 323 324 325 326 327 328
}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

329
    switch (_u.base.type) {
330
        case type_vsm:
331
            return _u.vsm.data;
332
        case type_lmsg:
333
            return _u.lmsg.content->data;
334
        case type_cmsg:
335
            return _u.cmsg.data;
336
        case type_zclmsg:
337
            return _u.zclmsg.content->data;
338 339 340
        default:
            zmq_assert (false);
            return NULL;
341
    }
342 343
}

344
size_t zmq::msg_t::size () const
345
{
346 347
    //  Check the validity of the message.
    zmq_assert (check ());
348

349
    switch (_u.base.type) {
350
        case type_vsm:
351
            return _u.vsm.size;
352
        case type_lmsg:
353
            return _u.lmsg.content->size;
354
        case type_zclmsg:
355
            return _u.zclmsg.content->size;
356
        case type_cmsg:
357
            return _u.cmsg.size;
358 359 360
        default:
            zmq_assert (false);
            return 0;
361 362
    }
}
363

364
unsigned char zmq::msg_t::flags () const
365
{
366
    return _u.base.flags;
367 368
}

369
void zmq::msg_t::set_flags (unsigned char flags_)
370
{
371
    _u.base.flags |= flags_;
372
}
373

374 375
void zmq::msg_t::reset_flags (unsigned char flags_)
{
376
    _u.base.flags &= ~flags_;
377 378
}

379
zmq::metadata_t *zmq::msg_t::metadata () const
380
{
381
    return _u.base.metadata;
382 383
}

384
void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
385
{
386
    assert (metadata_ != NULL);
387
    assert (_u.base.metadata == NULL);
388
    metadata_->add_ref ();
389
    _u.base.metadata = metadata_;
390 391
}

392 393
void zmq::msg_t::reset_metadata ()
{
394 395 396
    if (_u.base.metadata) {
        if (_u.base.metadata->drop_ref ()) {
            LIBZMQ_DELETE (_u.base.metadata);
397
        }
398
        _u.base.metadata = NULL;
399 400 401
    }
}

402
bool zmq::msg_t::is_routing_id () const
Martin Hurton's avatar
Martin Hurton committed
403
{
404
    return (_u.base.flags & routing_id) == routing_id;
Martin Hurton's avatar
Martin Hurton committed
405 406
}

407 408
bool zmq::msg_t::is_credential () const
{
409
    return (_u.base.flags & credential) == credential;
410 411
}

412
bool zmq::msg_t::is_delimiter () const
413
{
414
    return _u.base.type == type_delimiter;
415 416
}

417
bool zmq::msg_t::is_vsm () const
418
{
419
    return _u.base.type == type_vsm;
420 421
}

422
bool zmq::msg_t::is_cmsg () const
423
{
424
    return _u.base.type == type_cmsg;
425 426
}

427 428 429 430 431
bool zmq::msg_t::is_lmsg () const
{
    return _u.base.type == type_lmsg;
}

432
bool zmq::msg_t::is_zcmsg () const
433
{
434
    return _u.base.type == type_zclmsg;
435 436
}

437
bool zmq::msg_t::is_join () const
438
{
439
    return _u.base.type == type_join;
440 441
}

442
bool zmq::msg_t::is_leave () const
443
{
444
    return _u.base.type == type_leave;
445 446
}

447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
bool zmq::msg_t::is_ping () const
{
    return (_u.base.flags & CMD_TYPE_MASK) == ping;
}

bool zmq::msg_t::is_pong () const
{
    return (_u.base.flags & CMD_TYPE_MASK) == pong;
}

size_t zmq::msg_t::command_body_size () const
{
    if (this->is_ping () || this->is_pong ())
        return this->size () - ping_cmd_name_size;
    if (this->is_subscribe ())
        return this->size () - sub_cmd_name_size;
    if (this->is_cancel ())
        return this->size () - cancel_cmd_name_size;

    return 0;
}

void *zmq::msg_t::command_body ()
{
    unsigned char *data = NULL;
    if (this->is_ping () || this->is_pong ())
        data =
          static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
    if (this->is_subscribe ())
        data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
    if (this->is_cancel ())
        data =
          static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;

    return data;
}

484 485 486
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
487

488
    //  Operation not supported for messages with metadata.
489
    zmq_assert (_u.base.metadata == NULL);
490

491 492 493 494
    //  No copies required.
    if (!refs_)
        return;

495 496
    //  VSMs, CMSGS and delimiters can be copied straight away. The only
    //  message type that needs special care are long messages.
497 498
    if (_u.base.type == type_lmsg || is_zcmsg ()) {
        if (_u.base.flags & msg_t::shared)
499
            refcnt ()->add (refs_);
500
        else {
501
            refcnt ()->set (refs_ + 1);
502
            _u.base.flags |= msg_t::shared;
503 504 505 506
        }
    }
}

507
bool zmq::msg_t::rm_refs (int refs_)
508 509 510
{
    zmq_assert (refs_ >= 0);

511
    //  Operation not supported for messages with metadata.
512
    zmq_assert (_u.base.metadata == NULL);
513

514 515
    //  No copies required.
    if (!refs_)
516 517 518
        return true;

    //  If there's only one reference close the message.
519 520
    if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg)
        || !(_u.base.flags & msg_t::shared)) {
521 522 523
        close ();
        return false;
    }
524

525
    //  The only message type that needs special care are long and zcopy messages.
526
    if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) {
Martin Hurton's avatar
Martin Hurton committed
527 528
        //  We used "placement new" operator to initialize the reference
        //  counter so we call the destructor explicitly now.
529
        _u.lmsg.content->refcnt.~atomic_counter_t ();
Martin Hurton's avatar
Martin Hurton committed
530

531 532 533
        if (_u.lmsg.content->ffn)
            _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint);
        free (_u.lmsg.content);
534 535 536 537

        return false;
    }

538
    if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) {
539
        // storage for rfcnt is provided externally
540 541 542
        if (_u.zclmsg.content->ffn) {
            _u.zclmsg.content->ffn (_u.zclmsg.content->data,
                                    _u.zclmsg.content->hint);
543
        }
Martin Hurton's avatar
Martin Hurton committed
544

545
        return false;
546
    }
547 548

    return true;
549
}
550

551
uint32_t zmq::msg_t::get_routing_id ()
552
{
553
    return _u.base.routing_id;
554 555
}

556
int zmq::msg_t::set_routing_id (uint32_t routing_id_)
557
{
558
    if (routing_id_) {
559
        _u.base.routing_id = routing_id_;
560 561 562 563
        return 0;
    }
    errno = EINVAL;
    return -1;
564
}
565

566 567
int zmq::msg_t::reset_routing_id ()
{
568
    _u.base.routing_id = 0;
569 570 571
    return 0;
}

572
const char *zmq::msg_t::group ()
somdoron's avatar
somdoron committed
573
{
574
    return _u.base.group;
somdoron's avatar
somdoron committed
575 576
}

577
int zmq::msg_t::set_group (const char *group_)
somdoron's avatar
somdoron committed
578
{
579
    return set_group (group_, ZMQ_GROUP_MAX_LENGTH);
580 581
}

582
int zmq::msg_t::set_group (const char *group_, size_t length_)
583
{
584
    if (length_ > ZMQ_GROUP_MAX_LENGTH) {
somdoron's avatar
somdoron committed
585 586 587 588
        errno = EINVAL;
        return -1;
    }

589 590
    strncpy (_u.base.group, group_, length_);
    _u.base.group[length_] = '\0';
somdoron's avatar
somdoron committed
591 592 593 594

    return 0;
}

595
zmq::atomic_counter_t *zmq::msg_t::refcnt ()
596
{
597
    switch (_u.base.type) {
598
        case type_lmsg:
599
            return &_u.lmsg.content->refcnt;
600
        case type_zclmsg:
601
            return &_u.zclmsg.content->refcnt;
602
        default:
603
            zmq_assert (false);
604 605 606
            return NULL;
    }
}