msg.cpp 8.75 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "msg.hpp"
21
#include "../include/zmq.h"
22 23 24 25 26 27

#include <string.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
28
#include "likely.hpp"
29
#include "metadata.hpp"
30 31
#include "err.hpp"

32
//  Check whether the sizes of public representation of the message (zmq_msg_t)
33
//  and private representation of the message (zmq::msg_t) match.
34

35 36 37
typedef char zmq_msg_size_check
    [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];

38
bool zmq::msg_t::check ()
39
{
40 41 42 43 44
     return u.base.type >= type_min && u.base.type <= type_max;
}

int zmq::msg_t::init ()
{
45
    u.vsm.metadata = NULL;
46 47 48
    u.vsm.type = type_vsm;
    u.vsm.flags = 0;
    u.vsm.size = 0;
49
    file_desc = -1;
50 51 52
    return 0;
}

53
int zmq::msg_t::init_size (size_t size_)
54
{
55
    file_desc = -1;
56
    if (size_ <= max_vsm_size) {
57
        u.vsm.metadata = NULL;
58 59 60
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
61 62
    }
    else {
63
        u.lmsg.metadata = NULL;
64 65 66 67
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
68
        if (unlikely (!u.lmsg.content)) {
69 70 71
            errno = ENOMEM;
            return -1;
        }
72 73 74 75 76 77

        u.lmsg.content->data = u.lmsg.content + 1;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
78 79 80 81
    }
    return 0;
}

82
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
83
    void *hint_)
84
{
85 86
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
87
    zmq_assert (data_ != NULL || size_ == 0);
88

89 90
    file_desc = -1;

91
    //  Initialize constant message if there's no need to deallocate
92
    if (ffn_ == NULL) {
93
        u.cmsg.metadata = NULL;
94 95 96 97
        u.cmsg.type = type_cmsg;
        u.cmsg.flags = 0;
        u.cmsg.data = data_;
        u.cmsg.size = size_;
98
    }
99
    else {
100
        u.lmsg.metadata = NULL;
101 102 103 104 105 106 107
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content = (content_t*) malloc (sizeof (content_t));
        if (!u.lmsg.content) {
            errno = ENOMEM;
            return -1;
        }
108

109 110 111 112 113 114
        u.lmsg.content->data = data_;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = ffn_;
        u.lmsg.content->hint = hint_;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
    }
115
    return 0;
116

117 118
}

119
int zmq::msg_t::init_delimiter ()
120
{
121
    u.delimiter.metadata = NULL;
122 123 124 125 126 127 128 129 130
    u.delimiter.type = type_delimiter;
    u.delimiter.flags = 0;
    return 0;
}

int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
131 132 133 134
        errno = EFAULT;
        return -1;
    }

135
    if (u.base.type == type_lmsg) {
136

137
        //  If the content is not shared, or if it is shared and the reference
138
        //  count has dropped to zero, deallocate it.
139 140
        if (!(u.lmsg.flags & msg_t::shared) ||
              !u.lmsg.content->refcnt.sub (1)) {
141

142 143 144
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();
145

146 147 148 149
            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
150 151 152
        }
    }

153 154 155
    if (u.base.metadata != NULL)
        if (u.base.metadata->drop_ref ())
            delete u.base.metadata;
156

157 158
    //  Make the message invalid.
    u.base.type = 0;
159 160 161 162

    return 0;
}

163
int zmq::msg_t::move (msg_t &src_)
164
{
165 166
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
167 168 169 170
        errno = EFAULT;
        return -1;
    }

171 172 173 174 175 176 177 178 179 180
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

181 182 183
    return 0;
}

184
int zmq::msg_t::copy (msg_t &src_)
185
{
186 187
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
188 189 190 191
        errno = EFAULT;
        return -1;
    }

192 193 194
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
195

196
    if (src_.u.base.type == type_lmsg) {
197 198 199

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
200 201
        if (src_.u.lmsg.flags & msg_t::shared)
            src_.u.lmsg.content->refcnt.add (1);
202
        else {
203 204
            src_.u.lmsg.flags |= msg_t::shared;
            src_.u.lmsg.content->refcnt.set (2);
205 206 207
        }
    }

208 209
    if (src_.u.base.metadata != NULL)
        src_.u.base.metadata->add_ref ();
210

211 212
    *this = src_;

213
    return 0;
214 215 216 217 218 219 220 221 222 223 224 225 226

}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

    switch (u.base.type) {
    case type_vsm:
        return u.vsm.data;
    case type_lmsg:
        return u.lmsg.content->data;
227 228
    case type_cmsg:
        return u.cmsg.data;
229 230
    default:
        zmq_assert (false);
231
        return NULL;
232
    }
233 234
}

235
size_t zmq::msg_t::size ()
236
{
237 238
    //  Check the validity of the message.
    zmq_assert (check ());
239

240 241 242 243 244
    switch (u.base.type) {
    case type_vsm:
        return u.vsm.size;
    case type_lmsg:
        return u.lmsg.content->size;
245 246
    case type_cmsg:
        return u.cmsg.size;
247 248
    default:
        zmq_assert (false);
249
        return 0;
250 251
    }
}
252

253 254 255
unsigned char zmq::msg_t::flags ()
{
    return u.base.flags;
256 257
}

258
void zmq::msg_t::set_flags (unsigned char flags_)
259
{
260 261
    u.base.flags |= flags_;
}
262

263 264 265 266 267
void zmq::msg_t::reset_flags (unsigned char flags_)
{
    u.base.flags &= ~flags_;
}

268
int64_t zmq::msg_t::fd ()
269
{
270
    return file_desc;
271
}
272 273

void zmq::msg_t::set_fd (int64_t fd_)
274
{
275
    file_desc = fd_;
276 277
}

278
zmq::metadata_t *zmq::msg_t::metadata () const
279
{
280
    return u.base.metadata;
281 282
}

283
void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
284
{
285 286 287 288
    assert (metadata_ != NULL);
    assert (u.base.metadata == NULL);
    metadata_->add_ref ();
    u.base.metadata = metadata_;
289 290
}

291 292 293 294 295 296 297 298 299
void zmq::msg_t::reset_metadata ()
{
    if (u.base.metadata) {
        if (u.base.metadata->drop_ref ())
            delete u.base.metadata;
        u.base.metadata = NULL;
    }
}

Martin Hurton's avatar
Martin Hurton committed
300 301 302 303 304
bool zmq::msg_t::is_identity () const
{
    return (u.base.flags & identity) == identity;
}

305 306 307 308 309
bool zmq::msg_t::is_credential () const
{
    return (u.base.flags & credential) == credential;
}

310
bool zmq::msg_t::is_delimiter () const
311 312 313 314
{
    return u.base.type == type_delimiter;
}

315 316 317 318 319
bool zmq::msg_t::is_vsm ()
{
    return u.base.type == type_vsm;
}

320 321 322 323 324
bool zmq::msg_t::is_cmsg ()
{
    return u.base.type == type_cmsg;
}

325 326 327
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
328

329 330
    //  Operation not supported for messages with metadata.
    zmq_assert (u.base.metadata == NULL);
331

332 333 334 335
    //  No copies required.
    if (!refs_)
        return;

336 337
    //  VSMs, CMSGS and delimiters can be copied straight away. The only
    //  message type that needs special care are long messages.
338 339 340 341 342 343 344 345 346 347
    if (u.base.type == type_lmsg) {
        if (u.lmsg.flags & msg_t::shared)
            u.lmsg.content->refcnt.add (refs_);
        else {
            u.lmsg.content->refcnt.set (refs_ + 1);
            u.lmsg.flags |= msg_t::shared;
        }
    }
}

348
bool zmq::msg_t::rm_refs (int refs_)
349 350 351
{
    zmq_assert (refs_ >= 0);

352 353
    //  Operation not supported for messages with metadata.
    zmq_assert (u.base.metadata == NULL);
354

355 356
    //  No copies required.
    if (!refs_)
357 358 359 360 361 362 363
        return true;

    //  If there's only one reference close the message.
    if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) {
        close ();
        return false;
    }
364 365

    //  The only message type that needs special care are long messages.
366
    if (!u.lmsg.content->refcnt.sub (refs_)) {
Martin Hurton's avatar
Martin Hurton committed
367 368 369 370 371 372 373 374
        //  We used "placement new" operator to initialize the reference
        //  counter so we call the destructor explicitly now.
        u.lmsg.content->refcnt.~atomic_counter_t ();

        if (u.lmsg.content->ffn)
            u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
        free (u.lmsg.content);

375
        return false;
376
    }
377 378

    return true;
379
}