msg.cpp 6.53 KB
Newer Older
1
/*
Pieter Hintjens's avatar
Pieter Hintjens committed
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "msg.hpp"
23
#include "../include/zmq.h"
24 25 26 27 28 29

#include <string.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
30
#include "likely.hpp"
31 32
#include "err.hpp"

33 34 35 36 37
//  Check whether the sizes of public representation of the message (zmq_msg_t)
//  and private represenation of the message (zmq::msg_t) match.
typedef char zmq_msg_size_check
    [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];

38
bool zmq::msg_t::check ()
39
{
40 41 42 43 44 45 46 47
     return u.base.type >= type_min && u.base.type <= type_max;
}

int zmq::msg_t::init ()
{
    u.vsm.type = type_vsm;
    u.vsm.flags = 0;
    u.vsm.size = 0;
48 49 50
    return 0;
}

51
int zmq::msg_t::init_size (size_t size_)
52
{
53 54 55 56
    if (size_ <= max_vsm_size) {
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
57 58
    }
    else {
59 60 61 62 63
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
        if (!u.lmsg.content) {
64 65 66
            errno = ENOMEM;
            return -1;
        }
67 68 69 70 71 72

        u.lmsg.content->data = u.lmsg.content + 1;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
73 74 75 76
    }
    return 0;
}

77
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
78
    void *hint_)
79
{
80 81 82
    u.lmsg.type = type_lmsg;
    u.lmsg.flags = 0;
    u.lmsg.content = (content_t*) malloc (sizeof (content_t));
83 84 85 86
    if (!u.lmsg.content) {
        errno = ENOMEM;
        return -1;
    }
87 88 89 90 91 92

    u.lmsg.content->data = data_;
    u.lmsg.content->size = size_;
    u.lmsg.content->ffn = ffn_;
    u.lmsg.content->hint = hint_;
    new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
93
    return 0;
94

95 96
}

97
int zmq::msg_t::init_delimiter ()
98
{
99 100 101 102 103 104 105 106 107
    u.delimiter.type = type_delimiter;
    u.delimiter.flags = 0;
    return 0;
}

int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
108 109 110 111
        errno = EFAULT;
        return -1;
    }

112
    if (u.base.type == type_lmsg) {
113

114
        //  If the content is not shared, or if it is shared and the reference
115
        //  count has dropped to zero, deallocate it.
116 117
        if (!(u.lmsg.flags & msg_t::shared) ||
              !u.lmsg.content->refcnt.sub (1)) {
118

119 120 121
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();
122

123 124 125 126
            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
127 128 129
        }
    }

130 131
    //  Make the message invalid.
    u.base.type = 0;
132 133

    return 0;
134

135 136
}

137
int zmq::msg_t::move (msg_t &src_)
138
{
139 140
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
141 142 143 144
        errno = EFAULT;
        return -1;
    }

145 146 147 148 149 150 151 152 153 154
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

155 156 157
    return 0;
}

158
int zmq::msg_t::copy (msg_t &src_)
159
{
160 161
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
162 163 164 165
        errno = EFAULT;
        return -1;
    }

166 167 168
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
169

170
    if (src_.u.base.type == type_lmsg) {
171 172 173

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
174 175
        if (src_.u.lmsg.flags & msg_t::shared)
            src_.u.lmsg.content->refcnt.add (1);
176
        else {
177 178
            src_.u.lmsg.flags |= msg_t::shared;
            src_.u.lmsg.content->refcnt.set (2);
179 180 181
        }
    }

182 183
    *this = src_;

184
    return 0;
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

    switch (u.base.type) {
    case type_vsm:
        return u.vsm.data;
    case type_lmsg:
        return u.lmsg.content->data;
    default:
        zmq_assert (false);
200
        return NULL;
201
    }
202 203
}

204
size_t zmq::msg_t::size ()
205
{
206 207
    //  Check the validity of the message.
    zmq_assert (check ());
208

209 210 211 212 213 214 215
    switch (u.base.type) {
    case type_vsm:
        return u.vsm.size;
    case type_lmsg:
        return u.lmsg.content->size;
    default:
        zmq_assert (false);
216
        return 0;
217 218
    }
}
219

220 221 222
unsigned char zmq::msg_t::flags ()
{
    return u.base.flags;
223 224
}

225
void zmq::msg_t::set_flags (unsigned char flags_)
226
{
227 228
    u.base.flags |= flags_;
}
229

230 231 232 233 234 235 236 237 238 239
void zmq::msg_t::reset_flags (unsigned char flags_)
{
    u.base.flags &= ~flags_;
}

bool zmq::msg_t::is_delimiter ()
{
    return u.base.type == type_delimiter;
}

240 241 242 243 244
bool zmq::msg_t::is_vsm ()
{
    return u.base.type == type_vsm;
}

245 246 247
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
248

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
    //  No copies required.
    if (!refs_)
        return;

    //  VSMs and delimiters can be copied straight away. The only message type
    //  that needs special care are long messages.
    if (u.base.type == type_lmsg) {
        if (u.lmsg.flags & msg_t::shared)
            u.lmsg.content->refcnt.add (refs_);
        else {
            u.lmsg.content->refcnt.set (refs_ + 1);
            u.lmsg.flags |= msg_t::shared;
        }
    }
}

265
bool zmq::msg_t::rm_refs (int refs_)
266 267 268 269 270
{
    zmq_assert (refs_ >= 0);

    //  No copies required.
    if (!refs_)
271 272 273 274 275 276 277
        return true;

    //  If there's only one reference close the message.
    if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) {
        close ();
        return false;
    }
278 279

    //  The only message type that needs special care are long messages.
280 281 282
    if (!u.lmsg.content->refcnt.sub (refs_)) {
        close ();
        return false;
283
    }
284 285

    return true;
286
}
287