mechanism.cpp 12.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <string.h>
32
#include <limits.h>
33 34 35 36 37 38

#include "mechanism.hpp"
#include "options.hpp"
#include "msg.hpp"
#include "err.hpp"
#include "wire.hpp"
39
#include "session_base.hpp"
40

41
zmq::mechanism_t::mechanism_t (const options_t &options_) : options (options_)
42 43 44 45 46 47 48
{
}

zmq::mechanism_t::~mechanism_t ()
{
}

49 50
void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr_,
                                            size_t id_size_)
51
{
52
    _routing_id.set (static_cast<const unsigned char *> (id_ptr_), id_size_);
53 54
}

55
void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
56
{
57
    const int rc = msg_->init_size (_routing_id.size ());
58
    errno_assert (rc == 0);
59
    memcpy (msg_->data (), _routing_id.data (), _routing_id.size ());
60
    msg_->set_flags (msg_t::routing_id);
61 62
}

63
void zmq::mechanism_t::set_user_id (const void *user_id_, size_t size_)
64
{
65
    _user_id.set (static_cast<const unsigned char *> (user_id_), size_);
66
    _zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE (
67
      std::string (ZMQ_MSG_PROPERTY_USER_ID),
68
      std::string (reinterpret_cast<const char *> (user_id_), size_));
69 70
}

71
const zmq::blob_t &zmq::mechanism_t::get_user_id () const
72
{
73
    return _user_id;
74 75
}

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
const char socket_type_pair[] = "PAIR";
const char socket_type_pub[] = "PUB";
const char socket_type_sub[] = "SUB";
const char socket_type_req[] = "REQ";
const char socket_type_rep[] = "REP";
const char socket_type_dealer[] = "DEALER";
const char socket_type_router[] = "ROUTER";
const char socket_type_pull[] = "PULL";
const char socket_type_push[] = "PUSH";
const char socket_type_xpub[] = "XPUB";
const char socket_type_xsub[] = "XSUB";
const char socket_type_stream[] = "STREAM";
#ifdef ZMQ_BUILD_DRAFT_API
const char socket_type_server[] = "SERVER";
const char socket_type_client[] = "CLIENT";
const char socket_type_radio[] = "RADIO";
const char socket_type_dish[] = "DISH";
const char socket_type_gather[] = "GATHER";
const char socket_type_scatter[] = "SCATTER";
const char socket_type_dgram[] = "DGRAM";
#endif

98
const char *zmq::mechanism_t::socket_type_string (int socket_type_) const
99
{
100
    // TODO the order must of the names must correspond to the values resp. order of ZMQ_* socket type definitions in zmq.h!
101
    static const char *names[] = {
102 103 104 105 106 107 108 109 110 111 112
      socket_type_pair,   socket_type_pub,    socket_type_sub,
      socket_type_req,    socket_type_rep,    socket_type_dealer,
      socket_type_router, socket_type_pull,   socket_type_push,
      socket_type_xpub,   socket_type_xsub,   socket_type_stream,
#ifdef ZMQ_BUILD_DRAFT_API
      socket_type_server, socket_type_client, socket_type_radio,
      socket_type_dish,   socket_type_gather, socket_type_scatter,
      socket_type_dgram
#endif
    };
    static const size_t names_count = sizeof (names) / sizeof (names[0]);
113 114
    zmq_assert (socket_type_ >= 0 && socket_type_ < (int) names_count);
    return names[socket_type_];
115 116
}

117 118 119
const size_t name_len_size = sizeof (unsigned char);
const size_t value_len_size = sizeof (uint32_t);

120
static size_t property_len (size_t name_len_, size_t value_len_)
121
{
122
    return name_len_size + name_len_ + value_len_size + value_len_;
123 124
}

125
static size_t name_len (const char *name_)
126
{
127
    const size_t name_len = strlen (name_);
128
    zmq_assert (name_len <= UCHAR_MAX);
129 130 131
    return name_len;
}

132 133 134 135 136
size_t zmq::mechanism_t::add_property (unsigned char *ptr_,
                                       size_t ptr_capacity_,
                                       const char *name_,
                                       const void *value_,
                                       size_t value_len_)
137
{
138 139 140 141
    const size_t name_len = ::name_len (name_);
    const size_t total_len = ::property_len (name_len, value_len_);
    zmq_assert (total_len <= ptr_capacity_);

142 143
    *ptr_ = static_cast<unsigned char> (name_len);
    ptr_ += name_len_size;
144 145 146 147
    memcpy (ptr_, name_, name_len);
    ptr_ += name_len;
    zmq_assert (value_len_ <= 0x7FFFFFFF);
    put_uint32 (ptr_, static_cast<uint32_t> (value_len_));
148
    ptr_ += value_len_size;
149
    memcpy (ptr_, value_, value_len_);
150

151 152 153
    return total_len;
}

154
size_t zmq::mechanism_t::property_len (const char *name_, size_t value_len_)
155
{
156
    return ::property_len (name_len (name_), value_len_);
157 158
}

159 160 161
#define ZMTP_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMTP_PROPERTY_IDENTITY "Identity"

162 163
size_t zmq::mechanism_t::add_basic_properties (unsigned char *ptr_,
                                               size_t ptr_capacity_) const
164
{
165
    unsigned char *ptr = ptr_;
166 167 168

    //  Add socket type property
    const char *socket_type = socket_type_string (options.type);
169
    ptr += add_property (ptr, ptr_capacity_, ZMTP_PROPERTY_SOCKET_TYPE,
170
                         socket_type, strlen (socket_type));
171

172
    //  Add identity (aka routing id) property
173
    if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
174
        || options.type == ZMQ_ROUTER) {
175
        ptr += add_property (ptr, ptr_capacity_ - (ptr - ptr_),
176 177
                             ZMTP_PROPERTY_IDENTITY, options.routing_id,
                             options.routing_id_size);
178
    }
179

180

181 182 183 184
    for (std::map<std::string, std::string>::const_iterator
           it = options.app_metadata.begin (),
           end = options.app_metadata.end ();
         it != end; ++it) {
185
        ptr +=
186
          add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (),
187
                        it->second.c_str (), strlen (it->second.c_str ()));
188
    }
189

190
    return ptr - ptr_;
191 192
}

193
size_t zmq::mechanism_t::basic_properties_len () const
194 195
{
    const char *socket_type = socket_type_string (options.type);
196
    size_t meta_len = 0;
197

198 199 200 201
    for (std::map<std::string, std::string>::const_iterator
           it = options.app_metadata.begin (),
           end = options.app_metadata.end ();
         it != end; ++it) {
202 203
        meta_len +=
          property_len (it->first.c_str (), strlen (it->second.c_str ()));
204
    }
205

206
    return property_len (ZMTP_PROPERTY_SOCKET_TYPE, strlen (socket_type))
207
           + meta_len
208 209
           + ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
               || options.type == ZMQ_ROUTER)
210
                ? property_len (ZMTP_PROPERTY_IDENTITY, options.routing_id_size)
211 212 213 214
                : 0);
}

void zmq::mechanism_t::make_command_with_basic_properties (
215
  msg_t *msg_, const char *prefix_, size_t prefix_len_) const
216
{
217
    const size_t command_size = prefix_len_ + basic_properties_len ();
218 219 220
    const int rc = msg_->init_size (command_size);
    errno_assert (rc == 0);

221
    unsigned char *ptr = static_cast<unsigned char *> (msg_->data ());
222 223

    //  Add prefix
224 225
    memcpy (ptr, prefix_, prefix_len_);
    ptr += prefix_len_;
226

227 228
    add_basic_properties (
      ptr, command_size - (ptr - static_cast<unsigned char *> (msg_->data ())));
229
}
230

231
int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
232
                                      size_t length_,
233
                                      bool zap_flag_)
234 235 236 237
{
    size_t bytes_left = length_;

    while (bytes_left > 1) {
238
        const size_t name_length = static_cast<size_t> (*ptr_);
239 240
        ptr_ += name_len_size;
        bytes_left -= name_len_size;
241 242 243
        if (bytes_left < name_length)
            break;

244 245
        const std::string name =
          std::string (reinterpret_cast<const char *> (ptr_), name_length);
246 247
        ptr_ += name_length;
        bytes_left -= name_length;
248
        if (bytes_left < value_len_size)
249 250
            break;

251
        const size_t value_length = static_cast<size_t> (get_uint32 (ptr_));
252 253
        ptr_ += value_len_size;
        bytes_left -= value_len_size;
254 255 256 257 258 259 260
        if (bytes_left < value_length)
            break;

        const uint8_t *value = ptr_;
        ptr_ += value_length;
        bytes_left -= value_length;

261
        if (name == ZMTP_PROPERTY_IDENTITY && options.recv_routing_id)
262
            set_peer_routing_id (value, value_length);
263
        else if (name == ZMTP_PROPERTY_SOCKET_TYPE) {
264 265
            if (!check_socket_type (reinterpret_cast<const char *> (value),
                                    value_length)) {
266 267 268
                errno = EINVAL;
                return -1;
            }
269
        } else {
270 271 272 273
            const int rc = property (name, value, value_length);
            if (rc == -1)
                return -1;
        }
274
        (zap_flag_ ? _zap_properties : _zmtp_properties)
275
          .ZMQ_MAP_INSERT_OR_EMPLACE (
276 277
            name,
            std::string (reinterpret_cast<const char *> (value), value_length));
278 279 280 281 282 283 284 285
    }
    if (bytes_left > 0) {
        errno = EPROTO;
        return -1;
    }
    return 0;
}

286 287 288
int zmq::mechanism_t::property (const std::string & /* name_ */,
                                const void * /* value_ */,
                                size_t /* length_ */)
289 290 291 292 293
{
    //  Default implementation does not check
    //  property values and returns 0 to signal success.
    return 0;
}
294

295 296 297 298 299 300 301 302 303 304 305
template <size_t N>
static bool strequals (const char *actual_type_,
                       const size_t actual_len_,
                       const char (&expected_type_)[N])
{
    return actual_len_ == N - 1
           && memcmp (actual_type_, expected_type_, N - 1) == 0;
}

bool zmq::mechanism_t::check_socket_type (const char *type_,
                                          const size_t len_) const
306 307 308
{
    switch (options.type) {
        case ZMQ_REQ:
309 310
            return strequals (type_, len_, socket_type_rep)
                   || strequals (type_, len_, socket_type_router);
311
        case ZMQ_REP:
312 313
            return strequals (type_, len_, socket_type_req)
                   || strequals (type_, len_, socket_type_dealer);
314
        case ZMQ_DEALER:
315 316 317
            return strequals (type_, len_, socket_type_rep)
                   || strequals (type_, len_, socket_type_dealer)
                   || strequals (type_, len_, socket_type_router);
318
        case ZMQ_ROUTER:
319 320 321
            return strequals (type_, len_, socket_type_req)
                   || strequals (type_, len_, socket_type_dealer)
                   || strequals (type_, len_, socket_type_router);
322
        case ZMQ_PUSH:
323
            return strequals (type_, len_, socket_type_pull);
324
        case ZMQ_PULL:
325
            return strequals (type_, len_, socket_type_push);
326
        case ZMQ_PUB:
327 328
            return strequals (type_, len_, socket_type_sub)
                   || strequals (type_, len_, socket_type_xsub);
329
        case ZMQ_SUB:
330 331
            return strequals (type_, len_, socket_type_pub)
                   || strequals (type_, len_, socket_type_xpub);
332
        case ZMQ_XPUB:
333 334
            return strequals (type_, len_, socket_type_sub)
                   || strequals (type_, len_, socket_type_xsub);
335
        case ZMQ_XSUB:
336 337
            return strequals (type_, len_, socket_type_pub)
                   || strequals (type_, len_, socket_type_xpub);
338
        case ZMQ_PAIR:
339 340
            return strequals (type_, len_, socket_type_pair);
#ifdef ZMQ_BUILD_DRAFT_API
341
        case ZMQ_SERVER:
342
            return strequals (type_, len_, socket_type_client);
343
        case ZMQ_CLIENT:
344
            return strequals (type_, len_, socket_type_server);
somdoron's avatar
somdoron committed
345
        case ZMQ_RADIO:
346
            return strequals (type_, len_, socket_type_dish);
somdoron's avatar
somdoron committed
347
        case ZMQ_DISH:
348
            return strequals (type_, len_, socket_type_radio);
somdoron's avatar
somdoron committed
349
        case ZMQ_GATHER:
350
            return strequals (type_, len_, socket_type_scatter);
somdoron's avatar
somdoron committed
351
        case ZMQ_SCATTER:
352
            return strequals (type_, len_, socket_type_gather);
353
        case ZMQ_DGRAM:
354 355
            return strequals (type_, len_, socket_type_dgram);
#endif
356 357 358 359 360
        default:
            break;
    }
    return false;
}