zmq.cpp 40.1 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
Luca Boccassi's avatar
Luca Boccassi committed
29 30 31 32 33 34 35 36 37

// "Tell them I was a writer.
//  A maker of software.
//  A humanist. A father.
//  And many things.
//  But above all, a writer.
//  Thank You. :)"
//  - Pieter Hintjens

38
#include "precompiled.hpp"
39
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
40

41
#include "macros.hpp"
42
#include "poller.hpp"
43

44
#if !defined ZMQ_HAVE_POLLER
45 46 47 48
//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
49
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
50 51 52
#include <poll.h>
#endif

53 54 55
#include "polling_util.hpp"
#endif

56
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
AJ Lewis's avatar
AJ Lewis committed
57
// zmq.h must be included *after* poll.h for AIX to build properly
58
//#include "../include/zmq.h"
AJ Lewis's avatar
AJ Lewis committed
59

60
#if !defined ZMQ_HAVE_WINDOWS
61
#include <unistd.h>
62 63 64
#ifdef ZMQ_HAVE_VXWORKS
#include <strings.h>
#endif
65 66
#endif

skaller's avatar
skaller committed
67
// XSI vector I/O
68
#if defined ZMQ_HAVE_UIO
skaller's avatar
skaller committed
69 70
#include <sys/uio.h>
#else
71 72
struct iovec
{
skaller's avatar
skaller committed
73 74 75 76 77
    void *iov_base;
    size_t iov_len;
};
#endif

78
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
79 80
#include <stdlib.h>
#include <new>
81
#include <climits>
Martin Sustrik's avatar
Martin Sustrik committed
82

83
#include "proxy.hpp"
84
#include "socket_base.hpp"
85
#include "stdint.hpp"
86
#include "config.hpp"
87
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
88
#include "clock.hpp"
89
#include "ctx.hpp"
90
#include "err.hpp"
91
#include "msg.hpp"
92
#include "fd.hpp"
93
#include "metadata.hpp"
94
#include "socket_poller.hpp"
somdoron's avatar
somdoron committed
95
#include "timers.hpp"
96
#include "ip.hpp"
97
#include "address.hpp"
98

99
#if defined ZMQ_HAVE_OPENPGM
100
#define __PGM_WININT_H__
101 102 103
#include <pgm/pgm.h>
#endif

104
//  Compile time check whether msg_t fits into zmq_msg_t.
105 106
typedef char
  check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
107

108

109 110
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
111 112 113
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
114 115
}

116

117 118
const char *zmq_strerror (int errnum_)
{
119
    return zmq::errno_to_string (errnum_);
120 121
}

122
int zmq_errno (void)
123 124 125 126 127
{
    return errno;
}


128
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
129

130 131
void *zmq_ctx_new (void)
{
132
    //  We do this before the ctx constructor since its embedded mailbox_t
133 134 135 136
    //  object needs the network to be up and running (at least on Windows).
    if (!zmq::initialize_network ()) {
        return NULL;
    }
137

138
    //  Create 0MQ context.
139
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
140 141 142 143 144 145
    if (ctx) {
        if (!ctx->valid ()) {
            delete ctx;
            return NULL;
        }
    }
146 147 148
    return ctx;
}

149
int zmq_ctx_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
150
{
151
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
152 153 154
        errno = EFAULT;
        return -1;
    }
155

156
    int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
157 158
    int en = errno;

159 160
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
161
        zmq::shutdown_network ();
162
    }
163 164 165

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
166 167
}

168 169
int zmq_ctx_shutdown (void *ctx_)
{
170
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
171 172 173
        errno = EFAULT;
        return -1;
    }
174
    return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
175 176
}

177 178
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
179
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
180 181 182
        errno = EFAULT;
        return -1;
    }
183
    return (static_cast<zmq::ctx_t *> (ctx_))->set (option_, optval_);
184 185 186 187
}

int zmq_ctx_get (void *ctx_, int option_)
{
188
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
189 190 191
        errno = EFAULT;
        return -1;
    }
192
    return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
193 194 195 196 197 198
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
199 200 201 202 203 204
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
205
    return NULL;
206 207 208 209
}

int zmq_term (void *ctx_)
{
210 211 212 213 214 215
    return zmq_ctx_term (ctx_);
}

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
216 217 218 219
}


// Sockets
220

221 222 223 224 225 226 227 228 229 230
static zmq::socket_base_t *as_socket_base_t (void *s_)
{
    zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
    if (!s_ || !s->check_tag ()) {
        errno = ENOTSOCK;
        return NULL;
    }
    return s;
}

231
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
232
{
233
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
234 235 236
        errno = EFAULT;
        return NULL;
    }
237
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
238
    zmq::socket_base_t *s = ctx->create_socket (type_);
239
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
240 241
}

Martin Sustrik's avatar
Martin Sustrik committed
242
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
243
{
244 245
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
246
        return -1;
247
    s->close ();
Martin Sustrik's avatar
Martin Sustrik committed
248 249 250
    return 0;
}

251 252 253 254
int zmq_setsockopt (void *s_,
                    int option_,
                    const void *optval_,
                    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
255
{
256 257
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
258
        return -1;
259
    return s->setsockopt (option_, optval_, optvallen_);
Martin Sustrik's avatar
Martin Sustrik committed
260 261
}

262 263
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
264 265
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
266
        return -1;
267
    return s->getsockopt (option_, optval_, optvallen_);
268 269
}

270 271
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
272 273
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
274
        return -1;
275
    return s->monitor (addr_, events_);
276 277
}

278
int zmq_join (void *s_, const char *group_)
somdoron's avatar
somdoron committed
279
{
280 281
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
somdoron's avatar
somdoron committed
282
        return -1;
283
    return s->join (group_);
somdoron's avatar
somdoron committed
284 285
}

286
int zmq_leave (void *s_, const char *group_)
somdoron's avatar
somdoron committed
287
{
288 289
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
somdoron's avatar
somdoron committed
290
        return -1;
291
    return s->leave (group_);
somdoron's avatar
somdoron committed
292 293
}

294
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
295
{
296 297
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
298
        return -1;
299
    return s->bind (addr_);
300 301 302 303
}

int zmq_connect (void *s_, const char *addr_)
{
304 305
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
306
        return -1;
307
    return s->connect (addr_);
skaller's avatar
skaller committed
308 309
}

310
int zmq_unbind (void *s_, const char *addr_)
311
{
312 313
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
314
        return -1;
315
    return s->term_endpoint (addr_);
316 317
}

318
int zmq_disconnect (void *s_, const char *addr_)
319
{
320 321
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
322
        return -1;
323
    return s->term_endpoint (addr_);
324 325
}

326 327
// Sending functions.

328
static inline int
329
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
330
{
Min RK's avatar
Min RK committed
331
    size_t sz = zmq_msg_size (msg_);
332
    int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
skaller's avatar
skaller committed
333 334
    if (unlikely (rc < 0))
        return -1;
335 336 337 338 339 340

    //  This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
    //  int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
    size_t max_msgsz = INT_MAX;

    //  Truncate returned size to INT_MAX to avoid overflow to negative values
341
    return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
skaller's avatar
skaller committed
342 343
}

344
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
345 346
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
347
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
348 349
}

350
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
351
{
352 353
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
354
        return -1;
355
    zmq_msg_t msg;
356
    if (zmq_msg_init_size (&msg, len_))
357 358
        return -1;

359 360 361 362 363 364
    //  We explicitly allow a send from NULL, size zero
    if (len_) {
        assert (buf_);
        memcpy (zmq_msg_data (&msg), buf_, len_);
    }
    int rc = s_sendmsg (s, &msg, flags_);
365
    if (unlikely (rc < 0)) {
366 367 368 369 370 371 372 373
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
374
    return rc;
Uli Köhler's avatar
Uli Köhler committed
375 376
}

377
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
Uli Köhler's avatar
Uli Köhler committed
378
{
379 380
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
Uli Köhler's avatar
Uli Köhler committed
381 382
        return -1;
    zmq_msg_t msg;
383 384
    int rc =
      zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
385
    if (rc != 0)
Uli Köhler's avatar
Uli Köhler committed
386 387
        return -1;

388 389
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
Uli Köhler's avatar
Uli Köhler committed
390 391 392 393 394 395 396 397
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
398
    return rc;
399 400
}

401

skaller's avatar
skaller committed
402
// Send multiple messages.
403
// TODO: this function has no man page
skaller's avatar
skaller committed
404 405 406
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
407
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
408
//
409
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
410
{
411 412
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
413
        return -1;
414 415 416 417 418
    if (unlikely (count_ <= 0 || !a_)) {
        errno = EINVAL;
        return -1;
    }

skaller's avatar
skaller committed
419 420
    int rc = 0;
    zmq_msg_t msg;
421

422
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
423
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
424
        if (rc != 0) {
skaller's avatar
skaller committed
425 426 427 428
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
429 430
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
431 432
        rc = s_sendmsg (s, &msg, flags_);
        if (unlikely (rc < 0)) {
433 434 435 436 437 438
            int err = errno;
            int rc2 = zmq_msg_close (&msg);
            errno_assert (rc2 == 0);
            errno = err;
            rc = -1;
            break;
skaller's avatar
skaller committed
439 440
        }
    }
441
    return rc;
skaller's avatar
skaller committed
442 443
}

444 445
// Receiving functions.

446
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
447
{
448
    int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
skaller's avatar
skaller committed
449 450
    if (unlikely (rc < 0))
        return -1;
451 452

    //  Truncate returned size to INT_MAX to avoid overflow to negative values
Min RK's avatar
Min RK committed
453
    size_t sz = zmq_msg_size (msg_);
454
    return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
skaller's avatar
skaller committed
455 456
}

457
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
458 459
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
460
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
461 462 463
}


464
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
465
{
466 467
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
468
        return -1;
469 470 471 472
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

473
    int nbytes = s_recvmsg (s, &msg, flags_);
474
    if (unlikely (nbytes < 0)) {
475
        int err = errno;
476 477
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
478 479 480 481
        errno = err;
        return -1;
    }

482
    //  An oversized message is silently truncated.
483
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
484

485 486 487 488 489
    //  We explicitly allow a null buffer argument if len is zero
    if (to_copy) {
        assert (buf_);
        memcpy (buf_, zmq_msg_data (&msg), to_copy);
    }
490 491 492
    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

493
    return nbytes;
494 495
}

skaller's avatar
skaller committed
496
// Receive a multi-part message
497
//
skaller's avatar
skaller committed
498 499 500 501 502 503 504 505 506 507
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
508
// The iov_base* buffers of each iovec *a_ filled in by this
skaller's avatar
skaller committed
509
// function may be freed using free().
510
// TODO: this function has no man page
skaller's avatar
skaller committed
511
//
512
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
513
{
514 515
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
516
        return -1;
517 518 519 520 521
    if (unlikely (!count_ || *count_ <= 0 || !a_)) {
        errno = EINVAL;
        return -1;
    }

522
    size_t count = *count_;
skaller's avatar
skaller committed
523 524
    int nread = 0;
    bool recvmore = true;
525

526
    *count_ = 0;
skaller's avatar
skaller committed
527

528
    for (size_t i = 0; recvmore && i < count; ++i) {
skaller's avatar
skaller committed
529 530 531 532
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

533
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
534 535 536 537 538 539 540 541 542 543
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }

        a_[i].iov_len = zmq_msg_size (&msg);
544
        a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
545
        if (unlikely (!a_[i].iov_base)) {
546 547 548
            errno = ENOMEM;
            return -1;
        }
549 550
        memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
                a_[i].iov_len);
skaller's avatar
skaller committed
551
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
552
        const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
553 554
        recvmore = p_msg->flags () & zmq::msg_t::more;
        rc = zmq_msg_close (&msg);
555 556 557
        errno_assert (rc == 0);
        ++*count_;
        ++nread;
skaller's avatar
skaller committed
558
    }
559
    return nread;
skaller's avatar
skaller committed
560 561
}

562 563
// Message manipulators.

564 565
int zmq_msg_init (zmq_msg_t *msg_)
{
566
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
567 568 569 570
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
571
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
572 573
}

574 575
int zmq_msg_init_data (
  zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
576
{
577 578
    return (reinterpret_cast<zmq::msg_t *> (msg_))
      ->init_data (data_, size_, ffn_, hint_);
579 580
}

581
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
582
{
583 584
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
585
        return -1;
586
    return s_sendmsg (s, msg_, flags_);
587 588
}

589
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
590
{
591 592
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
593
        return -1;
594
    return s_recvmsg (s, msg_, flags_);
595 596
}

597 598
int zmq_msg_close (zmq_msg_t *msg_)
{
599
    return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
600 601 602 603
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
604 605
    return (reinterpret_cast<zmq::msg_t *> (dest_))
      ->move (*reinterpret_cast<zmq::msg_t *> (src_));
606 607 608 609
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
610 611
    return (reinterpret_cast<zmq::msg_t *> (dest_))
      ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
612 613 614 615
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
616
    return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
617 618
}

619
size_t zmq_msg_size (const zmq_msg_t *msg_)
620
{
621
    return ((zmq::msg_t *) msg_)->size ();
622 623
}

624
int zmq_msg_more (const zmq_msg_t *msg_)
625
{
626
    return zmq_msg_get (msg_, ZMQ_MORE);
627 628
}

629
int zmq_msg_get (const zmq_msg_t *msg_, int property_)
630
{
631
    const char *fd_string;
632

633
    switch (property_) {
634
        case ZMQ_MORE:
635
            return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
636
        case ZMQ_SRCFD:
637
            fd_string = zmq_msg_gets (msg_, "__fd");
638
            if (fd_string == NULL)
639
                return -1;
640

641
            return atoi (fd_string);
642
        case ZMQ_SHARED:
643 644 645 646
            return (((zmq::msg_t *) msg_)->is_cmsg ())
                       || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
                     ? 1
                     : 0;
647
        default:
648 649
            errno = EINVAL;
            return -1;
650 651 652
    }
}

653
int zmq_msg_set (zmq_msg_t *, int, int)
654
{
655
    //  No properties supported at present
656 657
    errno = EINVAL;
    return -1;
658 659
}

660 661
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
662 663
    return (reinterpret_cast<zmq::msg_t *> (msg_))
      ->set_routing_id (routing_id_);
664 665
}

666
uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
667
{
668
    return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
669
}
670

somdoron's avatar
somdoron committed
671 672
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
{
673
    return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
somdoron's avatar
somdoron committed
674 675 676 677
}

const char *zmq_msg_group (zmq_msg_t *msg_)
{
678
    return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
somdoron's avatar
somdoron committed
679 680
}

681 682
//  Get message metadata string

683
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
684
{
685 686
    const zmq::metadata_t *metadata =
      reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
687
    const char *value = NULL;
688
    if (metadata)
689 690 691
        value = metadata->get (std::string (property_));
    if (value)
        return value;
692 693 694

    errno = EINVAL;
    return NULL;
695 696
}

697
    // Polling.
698

699 700 701 702 703
#if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
    // implement zmq_poll on top of zmq_poller
    int rc;
704
    zmq_poller_event_t *events;
705
    zmq::socket_poller_t poller;
706
    events = new (std::nothrow) zmq_poller_event_t[nitems_];
707
    alloc_assert (events);
708

709
    bool repeat_items = false;
710 711
    //  Register sockets with poller
    for (int i = 0; i < nitems_; i++) {
712
        items_[i].revents = 0;
713 714 715

        bool modify = false;
        short e = items_[i].events;
716 717
        if (items_[i].socket) {
            //  Poll item is a 0MQ socket.
718 719 720 721 722 723 724 725 726
            for (int j = 0; j < i; ++j) {
                // Check for repeat entries
                if (items_[j].socket == items_[i].socket) {
                    repeat_items = true;
                    modify = true;
                    e |= items_[j].events;
                }
            }
            if (modify) {
727
                rc = zmq_poller_modify (&poller, items_[i].socket, e);
728
            } else {
729
                rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
730
            }
731
            if (rc < 0) {
732
                delete[] events;
733 734 735 736
                return rc;
            }
        } else {
            //  Poll item is a raw file descriptor.
737 738
            for (int j = 0; j < i; ++j) {
                // Check for repeat entries
739
                if (!items_[j].socket && items_[j].fd == items_[i].fd) {
740 741 742 743 744 745
                    repeat_items = true;
                    modify = true;
                    e |= items_[j].events;
                }
            }
            if (modify) {
746
                rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
747
            } else {
748
                rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
749
            }
750
            if (rc < 0) {
751
                delete[] events;
752 753 754 755 756 757
                return rc;
            }
        }
    }

    //  Wait for events
758
    rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
759
    if (rc < 0) {
760 761
        delete[] events;
        if (zmq_errno () == EAGAIN) {
Min RK's avatar
Min RK committed
762 763
            return 0;
        }
764 765 766
        return rc;
    }

767
    //  Transform poller events into zmq_pollitem events.
768
    //  items_ contains all items, while events only contains fired events.
769 770
    //  If no sockets are repeated (likely), the two are still co-ordered, so step through the items
    //  checking for matches only on the first event.
771 772 773 774 775
    //  If there are repeat items, they cannot be assumed to be co-ordered,
    //  so each pollitem must check fired events from the beginning.
    int j_start = 0, found_events = rc;
    for (int i = 0; i < nitems_; i++) {
        for (int j = j_start; j < found_events; ++j) {
776 777 778
            if ((items_[i].socket && items_[i].socket == events[j].socket)
                || (!(items_[i].socket || events[j].socket)
                    && items_[i].fd == events[j].fd)) {
779 780 781 782 783 784 785 786 787 788 789
                items_[i].revents = events[j].events & items_[i].events;
                if (!repeat_items) {
                    // no repeats, we can ignore events we've already seen
                    j_start++;
                }
                break;
            }
            if (!repeat_items) {
                // no repeats, never have to look at j > j_start
                break;
            }
790
        }
791 792 793
    }

    //  Cleanup
794
    delete[] events;
795 796 797 798
    return rc;
}
#endif // ZMQ_HAVE_POLLER

799 800
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
801 802
//  TODO: the function implementation can just call zmq_pollfd_poll with
//  pollfd as NULL, however pollfd is not yet stable.
803 804
#if defined ZMQ_HAVE_POLLER
    // if poller is present, use that.
805
    return zmq_poller_poll (items_, nitems_, timeout_);
806
#else
807
#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
808 809 810 811 812 813 814 815 816 817
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
        return 0;
818 819 820 821 822
#elif defined ZMQ_HAVE_VXWORKS
        struct timespec ns_;
        ns_.tv_sec = timeout_ / 1000;
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
        return nanosleep (&ns_, 0);
823 824 825 826 827 828 829 830 831
#else
        return usleep (timeout_ * 1000);
#endif
    }
    if (!items_) {
        errno = EFAULT;
        return -1;
    }

832 833 834 835
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
#if defined ZMQ_POLL_BASED_ON_POLL
836
    zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
837 838 839 840 841

    //  Build pollset for poll () system call.
    for (int i = 0; i != nitems_; i++) {
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
842
        if (items_[i].socket) {
843
            size_t zmq_fd_size = sizeof (zmq::fd_t);
844 845 846
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
                                &zmq_fd_size)
                == -1) {
847 848
                return -1;
            }
849
            pollfds[i].events = items_[i].events ? POLLIN : 0;
850 851 852 853
        }
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
        else {
854 855 856 857 858
            pollfds[i].fd = items_[i].fd;
            pollfds[i].events =
              (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
              | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
              | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
859 860
        }
    }
861 862 863
#else
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
864
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
865 866
    zmq_assert (nitems_ <= FD_SETSIZE);

867
    zmq::optimized_fd_set_t pollset_in (nitems_);
868
    FD_ZERO (pollset_in.get ());
869
    zmq::optimized_fd_set_t pollset_out (nitems_);
870
    FD_ZERO (pollset_out.get ());
871
    zmq::optimized_fd_set_t pollset_err (nitems_);
872
    FD_ZERO (pollset_err.get ());
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887

    zmq::fd_t maxfd = 0;

    //  Build the fd_sets for passing to select ().
    for (int i = 0; i != nitems_; i++) {
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
        if (items_[i].socket) {
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
                                &zmq_fd_size)
                == -1)
                return -1;
            if (items_[i].events) {
888
                FD_SET (notify_fd, pollset_in.get ());
889 890 891 892 893 894 895 896
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
        }
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_[i].events & ZMQ_POLLIN)
897
                FD_SET (items_[i].fd, pollset_in.get ());
898
            if (items_[i].events & ZMQ_POLLOUT)
899
                FD_SET (items_[i].fd, pollset_out.get ());
900
            if (items_[i].events & ZMQ_POLLERR)
901
                FD_SET (items_[i].fd, pollset_err.get ());
902 903 904 905 906
            if (maxfd < items_[i].fd)
                maxfd = items_[i].fd;
        }
    }

907 908 909
    zmq::optimized_fd_set_t inset (nitems_);
    zmq::optimized_fd_set_t outset (nitems_);
    zmq::optimized_fd_set_t errset (nitems_);
910
#endif
911 912 913 914 915

    bool first_pass = true;
    int nevents = 0;

    while (true) {
916 917
#if defined ZMQ_POLL_BASED_ON_POLL

918
        //  Compute the timeout for the subsequent poll.
919 920
        zmq::timeout_t timeout =
          zmq::compute_timeout (first_pass, timeout_, now, end);
921 922

        //  Wait for events.
923
        {
924
            int rc = poll (&pollfds[0], nitems_, timeout);
925 926 927 928 929 930 931
            if (rc == -1 && errno == EINTR) {
                return -1;
            }
            errno_assert (rc >= 0);
        }
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
932
            items_[i].revents = 0;
933 934 935

            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
936
            if (items_[i].socket) {
937 938
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
939 940 941
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
                                    &zmq_events_size)
                    == -1) {
942 943
                    return -1;
                }
944 945 946 947 948 949
                if ((items_[i].events & ZMQ_POLLOUT)
                    && (zmq_events & ZMQ_POLLOUT))
                    items_[i].revents |= ZMQ_POLLOUT;
                if ((items_[i].events & ZMQ_POLLIN)
                    && (zmq_events & ZMQ_POLLIN))
                    items_[i].revents |= ZMQ_POLLIN;
950 951 952 953
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
954 955 956 957 958 959 960 961
                if (pollfds[i].revents & POLLIN)
                    items_[i].revents |= ZMQ_POLLIN;
                if (pollfds[i].revents & POLLOUT)
                    items_[i].revents |= ZMQ_POLLOUT;
                if (pollfds[i].revents & POLLPRI)
                    items_[i].revents |= ZMQ_POLLPRI;
                if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
                    items_[i].revents |= ZMQ_POLLERR;
962 963
            }

964
            if (items_[i].revents)
965 966 967 968 969 970 971 972 973 974 975 976
                nevents++;
        }

#else

        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
977
        } else if (timeout_ < 0)
978 979
            ptimeout = NULL;
        else {
980 981
            timeout.tv_sec = static_cast<long> ((end - now) / 1000);
            timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
982 983 984 985 986
            ptimeout = &timeout;
        }

        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
987
            memcpy (inset.get (), pollset_in.get (),
988
                    zmq::valid_pollset_bytes (*pollset_in.get ()));
989
            memcpy (outset.get (), pollset_out.get (),
990
                    zmq::valid_pollset_bytes (*pollset_out.get ()));
991
            memcpy (errset.get (), pollset_err.get (),
992
                    zmq::valid_pollset_bytes (*pollset_err.get ()));
993
#if defined ZMQ_HAVE_WINDOWS
994 995
            int rc =
              select (0, inset.get (), outset.get (), errset.get (), ptimeout);
996 997 998 999 1000 1001
            if (unlikely (rc == SOCKET_ERROR)) {
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
            }
#else
1002 1003
            int rc = select (maxfd + 1, inset.get (), outset.get (),
                             errset.get (), ptimeout);
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
            if (unlikely (rc == -1)) {
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
            }
#endif
            break;
        }

        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
1014
            items_[i].revents = 0;
1015 1016 1017

            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
1018
            if (items_[i].socket) {
1019 1020
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
1021 1022 1023
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
                                    &zmq_events_size)
                    == -1)
1024
                    return -1;
1025 1026 1027 1028 1029 1030
                if ((items_[i].events & ZMQ_POLLOUT)
                    && (zmq_events & ZMQ_POLLOUT))
                    items_[i].revents |= ZMQ_POLLOUT;
                if ((items_[i].events & ZMQ_POLLIN)
                    && (zmq_events & ZMQ_POLLIN))
                    items_[i].revents |= ZMQ_POLLIN;
1031 1032 1033 1034
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
1035
                if (FD_ISSET (items_[i].fd, inset.get ()))
1036
                    items_[i].revents |= ZMQ_POLLIN;
1037
                if (FD_ISSET (items_[i].fd, outset.get ()))
1038
                    items_[i].revents |= ZMQ_POLLOUT;
1039
                if (FD_ISSET (items_[i].fd, errset.get ()))
1040
                    items_[i].revents |= ZMQ_POLLERR;
1041 1042
            }

1043
            if (items_[i].revents)
1044 1045
                nevents++;
        }
1046
#endif
1047

1048
        //  If timeout is zero, exit immediately whether there are events or not.
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
        if (timeout_ == 0)
            break;

        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
            continue;
        }

        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
            end = now + timeout_;
            if (now == end)
                break;
            first_pass = false;
            continue;
        }

        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
1080
            break;
1081 1082 1083 1084 1085 1086 1087 1088
    }

    return nevents;
#else
    //  Exotic platforms that support neither poll() nor select().
    errno = ENOTSUP;
    return -1;
#endif
1089
#endif // ZMQ_HAVE_POLLER
1090
}
1091

1092 1093
//  The poller functionality

1094
void *zmq_poller_new (void)
1095 1096
{
    zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1097 1098 1099
    if (!poller) {
        errno = ENOMEM;
    }
1100 1101 1102
    return poller;
}

1103
int zmq_poller_destroy (void **poller_p_)
1104
{
1105 1106 1107 1108 1109 1110 1111 1112
    if (poller_p_) {
        zmq::socket_poller_t *const poller =
          static_cast<zmq::socket_poller_t *> (*poller_p_);
        if (poller && poller->check_tag ()) {
            delete poller;
            *poller_p_ = NULL;
            return 0;
        }
1113
    }
1114 1115
    errno = EFAULT;
    return -1;
1116 1117
}

1118

1119
static int check_poller (void *const poller_)
1120
{
1121 1122
    if (!poller_
        || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1123 1124 1125 1126
        errno = EFAULT;
        return -1;
    }

1127 1128 1129
    return 0;
}

1130 1131 1132 1133 1134 1135 1136 1137 1138
static int check_events (const short events_)
{
    if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

1139 1140 1141 1142 1143
static int check_poller_registration_args (void *const poller_, void *const s_)
{
    if (-1 == check_poller (poller_))
        return -1;

1144
    if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1145 1146 1147
        errno = ENOTSOCK;
        return -1;
    }
1148 1149 1150 1151 1152

    return 0;
}

static int check_poller_fd_registration_args (void *const poller_,
1153
                                              const zmq::fd_t fd_)
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
{
    if (-1 == check_poller (poller_))
        return -1;

    if (fd_ == zmq::retired_fd) {
        errno = EBADF;
        return -1;
    }

    return 0;
}

int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{
1168 1169
    if (-1 == check_poller_registration_args (poller_, s_)
        || -1 == check_events (events_))
1170 1171
        return -1;

1172
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1173

1174
    return (static_cast<zmq::socket_poller_t *> (poller_))
1175
      ->add (socket, user_data_, events_);
1176 1177
}

1178
int zmq_poller_add_fd (void *poller_,
1179
                       zmq::fd_t fd_,
1180 1181
                       void *user_data_,
                       short events_)
1182
{
1183 1184
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
        || -1 == check_events (events_))
1185
        return -1;
1186

1187
    return (static_cast<zmq::socket_poller_t *> (poller_))
1188
      ->add_fd (fd_, user_data_, events_);
1189 1190
}

1191

1192
int zmq_poller_modify (void *poller_, void *s_, short events_)
1193
{
1194 1195
    if (-1 == check_poller_registration_args (poller_, s_)
        || -1 == check_events (events_))
1196 1197
        return -1;

1198
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1199

1200 1201
    return (static_cast<zmq::socket_poller_t *> (poller_))
      ->modify (socket, events_);
1202 1203
}

1204
int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1205
{
1206 1207
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
        || -1 == check_events (events_))
1208
        return -1;
1209

1210 1211
    return (static_cast<zmq::socket_poller_t *> (poller_))
      ->modify_fd (fd_, events_);
1212 1213
}

1214
int zmq_poller_remove (void *poller_, void *s_)
1215
{
1216
    if (-1 == check_poller_registration_args (poller_, s_))
1217 1218
        return -1;

1219
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1220

1221
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1222 1223
}

1224
int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1225
{
1226
    if (-1 == check_poller_fd_registration_args (poller_, fd_))
1227
        return -1;
1228

1229
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1230
}
somdoron's avatar
somdoron committed
1231

1232
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1233
{
1234
    int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1235

1236 1237 1238
    if (rc < 0 && event_) {
        // TODO this is not portable... zmq_poller_event_t contains pointers,
        // for which nullptr does not need to be represented by all-zeroes
1239
        memset (event_, 0, sizeof (zmq_poller_event_t));
1240
    }
1241 1242
    // wait_all returns number of events, but we return 0 for any success
    return rc >= 0 ? 0 : rc;
1243 1244
}

1245 1246
int zmq_poller_wait_all (void *poller_,
                         zmq_poller_event_t *events_,
1247
                         int n_events_,
1248
                         long timeout_)
1249
{
1250
    if (-1 == check_poller (poller_))
1251
        return -1;
1252

1253 1254 1255 1256
    if (!events_) {
        errno = EFAULT;
        return -1;
    }
1257
    if (n_events_ < 0) {
1258 1259 1260
        errno = EINVAL;
        return -1;
    }
1261

1262
    int rc =
1263 1264
      (static_cast<zmq::socket_poller_t *> (poller_))
        ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1265
                n_events_, timeout_);
1266 1267 1268 1269

    return rc;
}

1270 1271
//  Peer-specific state

1272
int zmq_socket_get_peer_state (void *s_,
1273 1274
                               const void *routing_id_,
                               size_t routing_id_size_)
1275
{
1276
    const zmq::socket_base_t *const s = as_socket_base_t (s_);
1277 1278 1279
    if (!s)
        return -1;

1280
    return s->get_peer_state (routing_id_, routing_id_size_);
1281 1282
}

somdoron's avatar
somdoron committed
1283 1284
//  Timers

1285
void *zmq_timers_new (void)
somdoron's avatar
somdoron committed
1286 1287 1288 1289 1290 1291
{
    zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
    alloc_assert (timers);
    return timers;
}

1292
int zmq_timers_destroy (void **timers_p_)
somdoron's avatar
somdoron committed
1293
{
1294
    void *timers = *timers_p_;
1295
    if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
somdoron's avatar
somdoron committed
1296 1297 1298
        errno = EFAULT;
        return -1;
    }
1299
    delete (static_cast<zmq::timers_t *> (timers));
1300
    *timers_p_ = NULL;
somdoron's avatar
somdoron committed
1301 1302 1303
    return 0;
}

1304 1305 1306 1307
int zmq_timers_add (void *timers_,
                    size_t interval_,
                    zmq_timer_fn handler_,
                    void *arg_)
somdoron's avatar
somdoron committed
1308
{
1309
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1310 1311 1312 1313
        errno = EFAULT;
        return -1;
    }

1314 1315
    return (static_cast<zmq::timers_t *> (timers_))
      ->add (interval_, handler_, arg_);
somdoron's avatar
somdoron committed
1316 1317 1318 1319
}

int zmq_timers_cancel (void *timers_, int timer_id_)
{
1320
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1321 1322 1323 1324
        errno = EFAULT;
        return -1;
    }

1325
    return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
somdoron's avatar
somdoron committed
1326 1327 1328 1329
}

int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
{
1330
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1331 1332 1333 1334
        errno = EFAULT;
        return -1;
    }

1335 1336
    return (static_cast<zmq::timers_t *> (timers_))
      ->set_interval (timer_id_, interval_);
somdoron's avatar
somdoron committed
1337 1338 1339 1340
}

int zmq_timers_reset (void *timers_, int timer_id_)
{
1341
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1342 1343 1344 1345
        errno = EFAULT;
        return -1;
    }

1346
    return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
somdoron's avatar
somdoron committed
1347 1348 1349 1350
}

long zmq_timers_timeout (void *timers_)
{
1351
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1352 1353 1354 1355
        errno = EFAULT;
        return -1;
    }

1356
    return (static_cast<zmq::timers_t *> (timers_))->timeout ();
somdoron's avatar
somdoron committed
1357 1358 1359 1360
}

int zmq_timers_execute (void *timers_)
{
1361
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1362 1363 1364 1365
        errno = EFAULT;
        return -1;
    }

1366
    return (static_cast<zmq::timers_t *> (timers_))->execute ();
somdoron's avatar
somdoron committed
1367 1368
}

1369 1370
//  The proxy functionality

1371 1372
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
1373 1374 1375 1376
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1377 1378 1379
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_),
                       static_cast<zmq::socket_base_t *> (capture_));
1380 1381
}

1382 1383 1384 1385
int zmq_proxy_steerable (void *frontend_,
                         void *backend_,
                         void *capture_,
                         void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
1386
{
1387 1388 1389 1390
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1391 1392 1393 1394
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_),
                       static_cast<zmq::socket_base_t *> (capture_),
                       static_cast<zmq::socket_base_t *> (control_));
1395
}
Pieter Hintjens's avatar
Pieter Hintjens committed
1396

1397
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
1398

1399
int zmq_device (int /* type */, void *frontend_, void *backend_)
1400
{
1401 1402
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_), NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
1403
}
1404 1405 1406

//  Probe library capabilities; for now, reports on transport and security

1407
int zmq_has (const char *capability_)
1408
{
1409
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
1410
    if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1411 1412
        return true;
#endif
1413
#if defined(ZMQ_HAVE_OPENPGM)
1414
    if (strcmp (capability_, "pgm") == 0)
1415 1416
        return true;
#endif
1417
#if defined(ZMQ_HAVE_TIPC)
1418
    if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1419 1420
        return true;
#endif
1421
#if defined(ZMQ_HAVE_NORM)
1422
    if (strcmp (capability_, "norm") == 0)
1423 1424
        return true;
#endif
1425
#if defined(ZMQ_HAVE_CURVE)
1426
    if (strcmp (capability_, "curve") == 0)
1427 1428
        return true;
#endif
1429
#if defined(HAVE_LIBGSSAPI_KRB5)
1430
    if (strcmp (capability_, "gssapi") == 0)
1431
        return true;
Ilya Kulakov's avatar
Ilya Kulakov committed
1432
#endif
1433
#if defined(ZMQ_HAVE_VMCI)
1434
    if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
Ilya Kulakov's avatar
Ilya Kulakov committed
1435
        return true;
1436
#endif
1437
#if defined(ZMQ_BUILD_DRAFT_API)
1438
    if (strcmp (capability_, "draft") == 0)
1439
        return true;
1440 1441 1442 1443
#endif
    //  Whatever the application asked for, we don't have
    return false;
}