msg.cpp 6.55 KB
Newer Older
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "msg.hpp"
23
#include "../include/zmq.h"
24 25 26 27 28 29 30

#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <new>

#include "stdint.hpp"
31
#include "likely.hpp"
32 33
#include "err.hpp"

34 35 36 37 38
//  Check whether the sizes of public representation of the message (zmq_msg_t)
//  and private represenation of the message (zmq::msg_t) match.
typedef char zmq_msg_size_check
    [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];

39
bool zmq::msg_t::check ()
40
{
41 42 43 44 45 46 47 48
     return u.base.type >= type_min && u.base.type <= type_max;
}

int zmq::msg_t::init ()
{
    u.vsm.type = type_vsm;
    u.vsm.flags = 0;
    u.vsm.size = 0;
49 50 51
    return 0;
}

52
int zmq::msg_t::init_size (size_t size_)
53
{
54 55 56 57
    if (size_ <= max_vsm_size) {
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
58 59
    }
    else {
60 61 62 63 64
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
        if (!u.lmsg.content) {
65 66 67
            errno = ENOMEM;
            return -1;
        }
68 69 70 71 72 73

        u.lmsg.content->data = u.lmsg.content + 1;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
74 75 76 77
    }
    return 0;
}

78
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
79
    void *hint_)
80
{
81 82 83
    u.lmsg.type = type_lmsg;
    u.lmsg.flags = 0;
    u.lmsg.content = (content_t*) malloc (sizeof (content_t));
84 85 86 87
    if (!u.lmsg.content) {
        errno = ENOMEM;
        return -1;
    }
88 89 90 91 92 93

    u.lmsg.content->data = data_;
    u.lmsg.content->size = size_;
    u.lmsg.content->ffn = ffn_;
    u.lmsg.content->hint = hint_;
    new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
94
    return 0;
95

96 97
}

98
int zmq::msg_t::init_delimiter ()
99
{
100 101 102 103 104 105 106 107 108
    u.delimiter.type = type_delimiter;
    u.delimiter.flags = 0;
    return 0;
}

int zmq::msg_t::close ()
{
    //  Check the validity of the message.
    if (unlikely (!check ())) {
109 110 111 112
        errno = EFAULT;
        return -1;
    }

113
    if (u.base.type == type_lmsg) {
114

115
        //  If the content is not shared, or if it is shared and the reference
116
        //  count has dropped to zero, deallocate it.
117 118
        if (!(u.lmsg.flags & msg_t::shared) ||
              !u.lmsg.content->refcnt.sub (1)) {
119

120 121 122
            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();
123

124 125 126 127
            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
128 129 130
        }
    }

131 132
    //  Make the message invalid.
    u.base.type = 0;
133 134

    return 0;
135

136 137
}

138
int zmq::msg_t::move (msg_t &src_)
139
{
140 141
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
142 143 144 145
        errno = EFAULT;
        return -1;
    }

146 147 148 149 150 151 152 153 154 155
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

156 157 158
    return 0;
}

159
int zmq::msg_t::copy (msg_t &src_)
160
{
161 162
    //  Check the validity of the source.
    if (unlikely (!src_.check ())) {
163 164 165 166
        errno = EFAULT;
        return -1;
    }

167 168 169
    int rc = close ();
    if (unlikely (rc < 0))
        return rc;
170

171
    if (src_.u.base.type == type_lmsg) {
172 173 174

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
175 176
        if (src_.u.lmsg.flags & msg_t::shared)
            src_.u.lmsg.content->refcnt.add (1);
177
        else {
178 179
            src_.u.lmsg.flags |= msg_t::shared;
            src_.u.lmsg.content->refcnt.set (2);
180 181 182
        }
    }

183 184
    *this = src_;

185
    return 0;
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200

}

void *zmq::msg_t::data ()
{
    //  Check the validity of the message.
    zmq_assert (check ());

    switch (u.base.type) {
    case type_vsm:
        return u.vsm.data;
    case type_lmsg:
        return u.lmsg.content->data;
    default:
        zmq_assert (false);
201
        return NULL;
202
    }
203 204
}

205
size_t zmq::msg_t::size ()
206
{
207 208
    //  Check the validity of the message.
    zmq_assert (check ());
209

210 211 212 213 214 215 216
    switch (u.base.type) {
    case type_vsm:
        return u.vsm.size;
    case type_lmsg:
        return u.lmsg.content->size;
    default:
        zmq_assert (false);
217
        return 0;
218 219
    }
}
220

221 222 223
unsigned char zmq::msg_t::flags ()
{
    return u.base.flags;
224 225
}

226
void zmq::msg_t::set_flags (unsigned char flags_)
227
{
228 229
    u.base.flags |= flags_;
}
230

231 232 233 234 235 236 237 238 239 240
void zmq::msg_t::reset_flags (unsigned char flags_)
{
    u.base.flags &= ~flags_;
}

bool zmq::msg_t::is_delimiter ()
{
    return u.base.type == type_delimiter;
}

241 242 243 244 245
bool zmq::msg_t::is_vsm ()
{
    return u.base.type == type_vsm;
}

246 247 248
void zmq::msg_t::add_refs (int refs_)
{
    zmq_assert (refs_ >= 0);
249

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    //  No copies required.
    if (!refs_)
        return;

    //  VSMs and delimiters can be copied straight away. The only message type
    //  that needs special care are long messages.
    if (u.base.type == type_lmsg) {
        if (u.lmsg.flags & msg_t::shared)
            u.lmsg.content->refcnt.add (refs_);
        else {
            u.lmsg.content->refcnt.set (refs_ + 1);
            u.lmsg.flags |= msg_t::shared;
        }
    }
}

266
bool zmq::msg_t::rm_refs (int refs_)
267 268 269 270 271
{
    zmq_assert (refs_ >= 0);

    //  No copies required.
    if (!refs_)
272 273 274 275 276 277 278
        return true;

    //  If there's only one reference close the message.
    if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) {
        close ();
        return false;
    }
279 280

    //  The only message type that needs special care are long messages.
281 282 283
    if (!u.lmsg.content->refcnt.sub (refs_)) {
        close ();
        return false;
284
    }
285 286

    return true;
287
}
288