msg.cpp 14 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33 34 35 36 37 38
#include "msg.hpp"

#include <string.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
39
#include "likely.hpp"
40
#include "metadata.hpp"
41 42
#include "err.hpp"

43
//  Check whether the sizes of public representation of the message (zmq_msg_t)
44
//  and private representation of the message (zmq::msg_t) match.
45

46 47 48
typedef char zmq_msg_size_check
    [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];

49
bool zmq::msg_t::check ()
50
{
51 52 53
     return u.base.type >= type_min && u.base.type <= type_max;
}

54 55
int zmq::msg_t::init (void* data_, size_t size_,
                      msg_free_fn* ffn_, void* hint,
56
                      content_t* content_)
57
{
58 59 60 61 62 63 64 65 66 67 68 69 70
    if (size_ < max_vsm_size) {
        int const rc = init_size(size_);

        if (rc != -1)
        {
            memcpy(data(), data_, size_);
            return 0;
        }
        else
        {
            return -1;
        }
    }
71
    else if(content_)
72
    {
73
        return init_external_storage(content_, data_, size_, ffn_, hint);
74 75 76
    }
    else
    {
77
        return init_data(data_, size_, ffn_, hint);
78 79 80
    }
}

81 82
int zmq::msg_t::init ()
{
83
    u.vsm.metadata = NULL;
84 85 86
    u.vsm.type = type_vsm;
    u.vsm.flags = 0;
    u.vsm.size = 0;
somdoron's avatar
somdoron committed
87
    u.vsm.group[0] = '\0';
88
    u.vsm.routing_id = 0;
89 90 91
    return 0;
}

92
int zmq::msg_t::init_size (size_t size_)
93
{
94
    if (size_ <= max_vsm_size) {
95
        u.vsm.metadata = NULL;
96 97 98
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
somdoron's avatar
somdoron committed
99
        u.vsm.group[0] = '\0';
100
        u.vsm.routing_id = 0;
101 102
    }
    else {
103
        u.lmsg.metadata = NULL;
104 105
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
somdoron's avatar
somdoron committed
106
        u.lmsg.group[0] = '\0';
107
        u.lmsg.routing_id = 0;
108 109 110
        u.lmsg.content = NULL;
        if (sizeof (content_t) + size_ > size_)
            u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
111
        if (unlikely (!u.lmsg.content)) {
112 113 114
            errno = ENOMEM;
            return -1;
        }
115

116 117 118 119 120
        u.lmsg.content->data = u.lmsg.content + 1;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
121 122 123 124
    }
    return 0;
}

125 126
int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
                                      msg_free_fn *ffn_, void* hint_)
127 128
{
    zmq_assert(NULL != data_);
129
    zmq_assert(NULL != content_);
130 131 132 133

    u.zclmsg.metadata = NULL;
    u.zclmsg.type = type_zclmsg;
    u.zclmsg.flags = 0;
somdoron's avatar
somdoron committed
134
    u.zclmsg.group[0] = '\0';
135 136
    u.zclmsg.routing_id = 0;

137 138 139 140 141 142
    u.zclmsg.content = content_;
    u.zclmsg.content->data = data_;
    u.zclmsg.content->size = size_;
    u.zclmsg.content->ffn = ffn_;
    u.zclmsg.content->hint = hint_;
    new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
143 144 145 146 147 148

    return 0;
}

int zmq::msg_t::init_data (void *data_, size_t size_,
                           msg_free_fn *ffn_, void *hint_)
149
{
150 151
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
152
    zmq_assert (data_ != NULL || size_ == 0);
153

154
    //  Initialize constant message if there's no need to deallocate
155
    if (ffn_ == NULL) {
156
        u.cmsg.metadata = NULL;
157 158 159 160
        u.cmsg.type = type_cmsg;
        u.cmsg.flags = 0;
        u.cmsg.data = data_;
        u.cmsg.size = size_;
somdoron's avatar
somdoron committed
161
        u.cmsg.group[0] = '\0';
162
        u.cmsg.routing_id = 0;
163
    }
164
    else {
165
        u.lmsg.metadata = NULL;
166 167
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
somdoron's avatar
somdoron committed
168
        u.lmsg.group[0] = '\0';
169
        u.lmsg.routing_id = 0;
170 171 172 173 174
        u.lmsg.content = (content_t*) malloc (sizeof (content_t));
        if (!u.lmsg.content) {
            errno = ENOMEM;
            return -1;
        }
175

176 177 178 179 180
        u.lmsg.content->data = data_;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = ffn_;
        u.lmsg.content->hint = hint_;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
181
    }
182
    return 0;
183

184 185
}

186
int zmq::msg_t::init_delimiter ()
187
{
188
    u.delimiter.metadata = NULL;
189 190
    u.delimiter.type = type_delimiter;
    u.delimiter.flags = 0;
somdoron's avatar
somdoron committed
191
    u.delimiter.group[0] = '\0';
192
    u.delimiter.routing_id = 0;
193 194 195
    return 0;
}

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
int zmq::msg_t::init_join ()
{
    u.base.metadata = NULL;
    u.base.type = type_join;
    u.base.flags = 0;
    u.base.group[0] = '\0';
    u.base.routing_id = 0;
    return 0;
}

int zmq::msg_t::init_leave ()
{
    u.base.metadata = NULL;
    u.base.type = type_leave;
    u.base.flags = 0;
    u.base.group[0] = '\0';
    u.base.routing_id = 0;
    return 0;
}

216 217 218 219
int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
220 221 222 223
        errno = EFAULT;
        return -1;
    }

224
    if (u.base.type == type_lmsg) {
225

226
        //  If the content is not shared, or if it is shared and the reference
227
        //  count has dropped to zero, deallocate it.
228
        if (!(u.lmsg.flags & msg_t::shared) ||
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
              !u.lmsg.content->refcnt.sub (1)) {

            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();

            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
        }
    }

    if (is_zcmsg())
    {
244
        zmq_assert( u.zclmsg.content->ffn );
245 246 247 248

        //  If the content is not shared, or if it is shared and the reference
        //  count has dropped to zero, deallocate it.
        if (!(u.zclmsg.flags & msg_t::shared) ||
249
            !u.zclmsg.content->refcnt.sub (1)) {
250 251 252

            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
253
            u.zclmsg.content->refcnt.~atomic_counter_t ();
254

255 256
            u.zclmsg.content->ffn (u.zclmsg.content->data,
                          u.zclmsg.content->hint);
257 258 259
        }
    }

260 261 262 263
    if (u.base.metadata != NULL) {
        if (u.base.metadata->drop_ref ()) {
            LIBZMQ_DELETE(u.base.metadata);
        }
264
        u.base.metadata = NULL;
265
    }
266

267 268
    //  Make the message invalid.
    u.base.type = 0;
269 270 271 272

    return 0;
}

273
int zmq::msg_t::move (msg_t &src_)
274
{
275 276
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
277 278 279 280
        errno = EFAULT;
        return -1;
    }

281 282 283 284 285 286 287 288 289 290
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

291 292 293
    return 0;
}

294
int zmq::msg_t::copy (msg_t &src_)
295
{
296 297
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
298 299 300 301
        errno = EFAULT;
        return -1;
    }

302 303 304
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
305

306
    if (src_.u.base.type == type_lmsg  ) {
307 308 309

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
310
        if (src_.u.lmsg.flags & msg_t::shared)
311
            src_.u.lmsg.content->refcnt.add (1);
312
        else {
313
            src_.u.lmsg.flags |= msg_t::shared;
314
            src_.u.lmsg.content->refcnt.set (2);
315 316 317
        }
    }

318 319 320 321 322 323 324 325 326 327 328
    if (src_.is_zcmsg()) {

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
        if (src_.u.zclmsg.flags & msg_t::shared)
            src_.refcnt()->add (1);
        else {
            src_.u.zclmsg.flags |= msg_t::shared;
            src_.refcnt()->set (2);
        }
    }
329 330
    if (src_.u.base.metadata != NULL)
        src_.u.base.metadata->add_ref ();
331

332 333
    *this = src_;

334
    return 0;
335 336 337 338 339 340 341 342 343 344 345 346

}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

    switch (u.base.type) {
    case type_vsm:
        return u.vsm.data;
    case type_lmsg:
347
        return u.lmsg.content->data;
348 349
    case type_cmsg:
        return u.cmsg.data;
350
    case type_zclmsg:
351
        return u.zclmsg.content->data;
352 353
    default:
        zmq_assert (false);
354
        return NULL;
355
    }
356 357
}

358
size_t zmq::msg_t::size ()
359
{
360 361
    //  Check the validity of the message.
    zmq_assert (check ());
362

363 364 365 366
    switch (u.base.type) {
    case type_vsm:
        return u.vsm.size;
    case type_lmsg:
367 368
        return u.lmsg.content->size;
    case type_zclmsg:
369
        return u.zclmsg.content->size;
370 371
    case type_cmsg:
        return u.cmsg.size;
372 373
    default:
        zmq_assert (false);
374
        return 0;
375 376
    }
}
377

378 379 380
unsigned char zmq::msg_t::flags ()
{
    return u.base.flags;
381 382
}

383
void zmq::msg_t::set_flags (unsigned char flags_)
384
{
385 386
    u.base.flags |= flags_;
}
387

388 389 390 391 392
void zmq::msg_t::reset_flags (unsigned char flags_)
{
    u.base.flags &= ~flags_;
}

393
zmq::metadata_t *zmq::msg_t::metadata () const
394
{
395
    return u.base.metadata;
396 397
}

398
void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
399
{
400 401 402 403
    assert (metadata_ != NULL);
    assert (u.base.metadata == NULL);
    metadata_->add_ref ();
    u.base.metadata = metadata_;
404 405
}

406 407 408
void zmq::msg_t::reset_metadata ()
{
    if (u.base.metadata) {
409
        if (u.base.metadata->drop_ref ()) {
410
            LIBZMQ_DELETE(u.base.metadata);
411 412
        }
        u.base.metadata = NULL;
413 414 415
    }
}

Martin Hurton's avatar
Martin Hurton committed
416 417 418 419 420
bool zmq::msg_t::is_identity () const
{
    return (u.base.flags & identity) == identity;
}

421 422 423 424 425
bool zmq::msg_t::is_credential () const
{
    return (u.base.flags & credential) == credential;
}

426
bool zmq::msg_t::is_delimiter () const
427 428 429 430
{
    return u.base.type == type_delimiter;
}

431
bool zmq::msg_t::is_vsm () const
432 433 434 435
{
    return u.base.type == type_vsm;
}

436
bool zmq::msg_t::is_cmsg () const
437
{
438
    return u.base.type == type_cmsg;
439 440
}

441
bool zmq::msg_t::is_zcmsg() const
442
{
443
    return u.base.type == type_zclmsg;
444 445
}

446 447 448 449 450 451 452 453 454 455
bool zmq::msg_t::is_join() const
{
    return u.base.type == type_join;
}

bool zmq::msg_t::is_leave() const
{
    return u.base.type == type_leave;
}

456 457 458
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
459

460 461
    //  Operation not supported for messages with metadata.
    zmq_assert (u.base.metadata == NULL);
462

463 464 465 466
    //  No copies required.
    if (!refs_)
        return;

467 468
    //  VSMs, CMSGS and delimiters can be copied straight away. The only
    //  message type that needs special care are long messages.
469 470 471
    if (u.base.type == type_lmsg || is_zcmsg() ) {
        if (u.base.flags & msg_t::shared)
            refcnt()->add (refs_);
472
        else {
473 474
            refcnt()->set (refs_ + 1);
            u.base.flags |= msg_t::shared;
475 476 477 478
        }
    }
}

479
bool zmq::msg_t::rm_refs (int refs_)
480 481 482
{
    zmq_assert (refs_ >= 0);

483 484
    //  Operation not supported for messages with metadata.
    zmq_assert (u.base.metadata == NULL);
485

486 487
    //  No copies required.
    if (!refs_)
488 489 490
        return true;

    //  If there's only one reference close the message.
491
    if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
492 493 494
        close ();
        return false;
    }
495

496
    //  The only message type that needs special care are long and zcopy messages.
497
    if (u.base.type == type_lmsg && !u.lmsg.content->refcnt.sub(refs_)) {
Martin Hurton's avatar
Martin Hurton committed
498 499
        //  We used "placement new" operator to initialize the reference
        //  counter so we call the destructor explicitly now.
500
        u.lmsg.content->refcnt.~atomic_counter_t ();
Martin Hurton's avatar
Martin Hurton committed
501

502 503 504 505 506 507 508
        if (u.lmsg.content->ffn)
            u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
        free (u.lmsg.content);

        return false;
    }

509
    if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
510
        // storage for rfcnt is provided externally
511 512
        if (u.zclmsg.content->ffn) {
            u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
513
        }
Martin Hurton's avatar
Martin Hurton committed
514

515
        return false;
516
    }
517 518

    return true;
519
}
520

521
uint32_t zmq::msg_t::get_routing_id ()
522 523 524 525
{
    return u.base.routing_id;
}

526
int zmq::msg_t::set_routing_id (uint32_t routing_id_)
527
{
528 529 530 531 532 533
    if (routing_id_) {
        u.base.routing_id = routing_id_;
        return 0;
    }
    errno = EINVAL;
    return -1;
534
}
535

536 537 538 539 540 541
int zmq::msg_t::reset_routing_id ()
{
    u.base.routing_id = 0;
    return 0;
}

somdoron's avatar
somdoron committed
542 543 544 545 546 547 548
const char * zmq::msg_t::group ()
{
    return u.base.group;
}

int zmq::msg_t::set_group (const char * group_)
{
549 550 551 552 553 554
    return set_group (group_, strlen (group_));
}

int zmq::msg_t::set_group (const char * group_, size_t length_)
{
    if (length_> ZMQ_GROUP_MAX_LENGTH)
somdoron's avatar
somdoron committed
555 556 557 558 559
    {
        errno = EINVAL;
        return -1;
    }

560 561
    strncpy (u.base.group, group_, length_);
    u.base.group[length_] = '\0';
somdoron's avatar
somdoron committed
562 563 564 565

    return 0;
}

566
zmq::atomic_counter_t *zmq::msg_t::refcnt()
567 568 569 570 571 572
{
    switch(u.base.type)
    {
        case type_lmsg:
            return &u.lmsg.content->refcnt;
        case type_zclmsg:
573
            return &u.zclmsg.content->refcnt;
574 575 576 577 578
        default:
            zmq_assert(false);
            return NULL;
    }
}