msg.cpp 7.71 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "msg.hpp"
21
#include "../include/zmq.h"
22 23 24 25 26 27

#include <string.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
28
#include "likely.hpp"
29 30
#include "err.hpp"

31
//  Check whether the sizes of public representation of the message (zmq_msg_t)
32
//  and private representation of the message (zmq::msg_t) match.
33

34 35 36
typedef char zmq_msg_size_check
    [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];

37
bool zmq::msg_t::check ()
38
{
39 40 41 42 43 44 45 46
     return u.base.type >= type_min && u.base.type <= type_max;
}

int zmq::msg_t::init ()
{
    u.vsm.type = type_vsm;
    u.vsm.flags = 0;
    u.vsm.size = 0;
47
    file_desc = -1;
48 49 50
    return 0;
}

51
int zmq::msg_t::init_size (size_t size_)
52
{
53
    file_desc = -1;
54 55 56 57
    if (size_ <= max_vsm_size) {
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
58 59
    }
    else {
60 61 62 63
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
64
        if (unlikely (!u.lmsg.content)) {
65 66 67
            errno = ENOMEM;
            return -1;
        }
68 69 70 71 72 73

        u.lmsg.content->data = u.lmsg.content + 1;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
74 75 76 77
    }
    return 0;
}

78
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
79
    void *hint_)
80
{
81 82
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
83
    zmq_assert (data_ != NULL || size_ == 0);
84
    
85 86
    file_desc = -1;

87
    //  Initialize constant message if there's no need to deallocate
88 89 90 91 92
    if(ffn_ == NULL) {
        u.cmsg.type = type_cmsg;
        u.cmsg.flags = 0;
        u.cmsg.data = data_;
        u.cmsg.size = size_;
93
    }
94 95 96 97 98 99 100 101
    else {
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content = (content_t*) malloc (sizeof (content_t));
        if (!u.lmsg.content) {
            errno = ENOMEM;
            return -1;
        }
102

103 104 105 106 107 108
        u.lmsg.content->data = data_;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = ffn_;
        u.lmsg.content->hint = hint_;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
    }
109
    return 0;
110

111 112
}

113
int zmq::msg_t::init_delimiter ()
114
{
115 116 117 118 119 120 121 122 123
    u.delimiter.type = type_delimiter;
    u.delimiter.flags = 0;
    return 0;
}

int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
124 125 126 127
        errno = EFAULT;
        return -1;
    }

128
    if (u.base.type == type_lmsg) {
129

130
        //  If the content is not shared, or if it is shared and the reference
131
        //  count has dropped to zero, deallocate it.
132 133
        if (!(u.lmsg.flags & msg_t::shared) ||
              !u.lmsg.content->refcnt.sub (1)) {
134

135 136 137
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();
138

139 140 141 142
            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
143 144 145
        }
    }

146 147
    //  Make the message invalid.
    u.base.type = 0;
148 149

    return 0;
150

151 152
}

153
int zmq::msg_t::move (msg_t &src_)
154
{
155 156
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
157 158 159 160
        errno = EFAULT;
        return -1;
    }

161 162 163 164 165 166 167 168 169 170
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

171 172 173
    return 0;
}

174
int zmq::msg_t::copy (msg_t &src_)
175
{
176 177
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
178 179 180 181
        errno = EFAULT;
        return -1;
    }

182 183 184
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
185

186
    if (src_.u.base.type == type_lmsg) {
187 188 189

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
190 191
        if (src_.u.lmsg.flags & msg_t::shared)
            src_.u.lmsg.content->refcnt.add (1);
192
        else {
193 194
            src_.u.lmsg.flags |= msg_t::shared;
            src_.u.lmsg.content->refcnt.set (2);
195 196 197
        }
    }

198 199
    *this = src_;

200
    return 0;
201 202 203 204 205 206 207 208 209 210 211 212 213

}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

    switch (u.base.type) {
    case type_vsm:
        return u.vsm.data;
    case type_lmsg:
        return u.lmsg.content->data;
214 215
    case type_cmsg:
        return u.cmsg.data;
216 217
    default:
        zmq_assert (false);
218
        return NULL;
219
    }
220 221
}

222
size_t zmq::msg_t::size ()
223
{
224 225
    //  Check the validity of the message.
    zmq_assert (check ());
226

227 228 229 230 231
    switch (u.base.type) {
    case type_vsm:
        return u.vsm.size;
    case type_lmsg:
        return u.lmsg.content->size;
232 233
    case type_cmsg:
        return u.cmsg.size;
234 235
    default:
        zmq_assert (false);
236
        return 0;
237 238
    }
}
239

240 241 242
unsigned char zmq::msg_t::flags ()
{
    return u.base.flags;
243 244
}

245
void zmq::msg_t::set_flags (unsigned char flags_)
246
{
247 248
    u.base.flags |= flags_;
}
249

250 251 252 253 254
void zmq::msg_t::reset_flags (unsigned char flags_)
{
    u.base.flags &= ~flags_;
}

255
int64_t zmq::msg_t::fd ()
256
{
257
    return file_desc;
258
}
259 260

void zmq::msg_t::set_fd (int64_t fd_)
261
{
262
    file_desc = fd_;
263 264
}

Martin Hurton's avatar
Martin Hurton committed
265 266 267 268 269
bool zmq::msg_t::is_identity () const
{
    return (u.base.flags & identity) == identity;
}

270 271 272 273 274
bool zmq::msg_t::is_credential () const
{
    return (u.base.flags & credential) == credential;
}

275
bool zmq::msg_t::is_delimiter () const
276 277 278 279
{
    return u.base.type == type_delimiter;
}

280 281 282 283 284
bool zmq::msg_t::is_vsm ()
{
    return u.base.type == type_vsm;
}

285 286 287 288 289
bool zmq::msg_t::is_cmsg ()
{
    return u.base.type == type_cmsg;
}

290 291 292
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
293

294 295 296 297
    //  No copies required.
    if (!refs_)
        return;

298 299
    //  VSMs, CMSGS and delimiters can be copied straight away. The only
    //  message type that needs special care are long messages.
300 301 302 303 304 305 306 307 308 309
    if (u.base.type == type_lmsg) {
        if (u.lmsg.flags & msg_t::shared)
            u.lmsg.content->refcnt.add (refs_);
        else {
            u.lmsg.content->refcnt.set (refs_ + 1);
            u.lmsg.flags |= msg_t::shared;
        }
    }
}

310
bool zmq::msg_t::rm_refs (int refs_)
311 312 313 314 315
{
    zmq_assert (refs_ >= 0);

    //  No copies required.
    if (!refs_)
316 317 318 319 320 321 322
        return true;

    //  If there's only one reference close the message.
    if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) {
        close ();
        return false;
    }
323 324

    //  The only message type that needs special care are long messages.
325
    if (!u.lmsg.content->refcnt.sub (refs_)) {
Martin Hurton's avatar
Martin Hurton committed
326 327 328 329 330 331 332 333
        //  We used "placement new" operator to initialize the reference
        //  counter so we call the destructor explicitly now.
        u.lmsg.content->refcnt.~atomic_counter_t ();

        if (u.lmsg.content->ffn)
            u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
        free (u.lmsg.content);

334
        return false;
335
    }
336 337

    return true;
338
}
339