zmq.cpp 40.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
Luca Boccassi's avatar
Luca Boccassi committed
29 30 31 32 33 34 35 36 37

// "Tell them I was a writer.
//  A maker of software.
//  A humanist. A father.
//  And many things.
//  But above all, a writer.
//  Thank You. :)"
//  - Pieter Hintjens

38
#include "precompiled.hpp"
39
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
40

41
#include "macros.hpp"
42
#include "poller.hpp"
43

44
#if !defined ZMQ_HAVE_POLLER
45 46 47 48
//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
49
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
50 51 52
#include <poll.h>
#endif

53 54 55
#include "polling_util.hpp"
#endif

56
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
AJ Lewis's avatar
AJ Lewis committed
57
// zmq.h must be included *after* poll.h for AIX to build properly
58
//#include "../include/zmq.h"
AJ Lewis's avatar
AJ Lewis committed
59

60
#if !defined ZMQ_HAVE_WINDOWS
61
#include <unistd.h>
62 63 64
#ifdef ZMQ_HAVE_VXWORKS
#include <strings.h>
#endif
65 66
#endif

skaller's avatar
skaller committed
67
// XSI vector I/O
68
#if defined ZMQ_HAVE_UIO
skaller's avatar
skaller committed
69 70
#include <sys/uio.h>
#else
71 72
struct iovec
{
skaller's avatar
skaller committed
73 74 75 76 77
    void *iov_base;
    size_t iov_len;
};
#endif

78
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
79 80
#include <stdlib.h>
#include <new>
81
#include <climits>
Martin Sustrik's avatar
Martin Sustrik committed
82

83
#include "proxy.hpp"
84
#include "socket_base.hpp"
85
#include "stdint.hpp"
86
#include "config.hpp"
87
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
88
#include "clock.hpp"
89
#include "ctx.hpp"
90
#include "err.hpp"
91
#include "msg.hpp"
92
#include "fd.hpp"
93
#include "metadata.hpp"
94
#include "socket_poller.hpp"
somdoron's avatar
somdoron committed
95
#include "timers.hpp"
96
#include "ip.hpp"
97
#include "address.hpp"
98

99
#if defined ZMQ_HAVE_OPENPGM
100
#define __PGM_WININT_H__
101 102 103
#include <pgm/pgm.h>
#endif

104
//  Compile time check whether msg_t fits into zmq_msg_t.
105 106
typedef char
  check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
107

108

109 110
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
111 112 113
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
114 115
}

116

117 118
const char *zmq_strerror (int errnum_)
{
119
    return zmq::errno_to_string (errnum_);
120 121
}

122
int zmq_errno (void)
123 124 125 126 127
{
    return errno;
}


128
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
129

130 131
void *zmq_ctx_new (void)
{
132
    //  We do this before the ctx constructor since its embedded mailbox_t
133 134 135 136
    //  object needs the network to be up and running (at least on Windows).
    if (!zmq::initialize_network ()) {
        return NULL;
    }
137

138
    //  Create 0MQ context.
139
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
140 141 142 143 144 145
    if (ctx) {
        if (!ctx->valid ()) {
            delete ctx;
            return NULL;
        }
    }
146 147 148
    return ctx;
}

149
int zmq_ctx_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
150
{
151
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
152 153 154
        errno = EFAULT;
        return -1;
    }
155

156
    int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
157 158
    int en = errno;

159 160
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
161
        zmq::shutdown_network ();
162
    }
163 164 165

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
166 167
}

168 169
int zmq_ctx_shutdown (void *ctx_)
{
170
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
171 172 173
        errno = EFAULT;
        return -1;
    }
174
    return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
175 176
}

177 178
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
179
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
180 181 182
        errno = EFAULT;
        return -1;
    }
183
    return (static_cast<zmq::ctx_t *> (ctx_))->set (option_, optval_);
184 185 186 187
}

int zmq_ctx_get (void *ctx_, int option_)
{
188
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
189 190 191
        errno = EFAULT;
        return -1;
    }
192
    return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
193 194 195 196 197 198
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
199 200 201 202 203 204
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
205
    return NULL;
206 207 208 209
}

int zmq_term (void *ctx_)
{
210 211 212 213 214 215
    return zmq_ctx_term (ctx_);
}

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
216 217 218 219
}


// Sockets
220

221 222 223 224 225 226 227 228 229 230
static zmq::socket_base_t *as_socket_base_t (void *s_)
{
    zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
    if (!s_ || !s->check_tag ()) {
        errno = ENOTSOCK;
        return NULL;
    }
    return s;
}

231
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
232
{
233
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
234 235 236
        errno = EFAULT;
        return NULL;
    }
237
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
238
    zmq::socket_base_t *s = ctx->create_socket (type_);
239
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
240 241
}

Martin Sustrik's avatar
Martin Sustrik committed
242
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
243
{
244 245
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
246
        return -1;
247
    s->close ();
Martin Sustrik's avatar
Martin Sustrik committed
248 249 250
    return 0;
}

251 252 253 254
int zmq_setsockopt (void *s_,
                    int option_,
                    const void *optval_,
                    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
255
{
256 257
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
258
        return -1;
259
    return s->setsockopt (option_, optval_, optvallen_);
Martin Sustrik's avatar
Martin Sustrik committed
260 261
}

262 263
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
264 265
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
266
        return -1;
267
    return s->getsockopt (option_, optval_, optvallen_);
268 269
}

270 271
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
272 273
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
274
        return -1;
275
    return s->monitor (addr_, events_);
276 277
}

278
int zmq_join (void *s_, const char *group_)
somdoron's avatar
somdoron committed
279
{
280 281
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
somdoron's avatar
somdoron committed
282
        return -1;
283
    return s->join (group_);
somdoron's avatar
somdoron committed
284 285
}

286
int zmq_leave (void *s_, const char *group_)
somdoron's avatar
somdoron committed
287
{
288 289
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
somdoron's avatar
somdoron committed
290
        return -1;
291
    return s->leave (group_);
somdoron's avatar
somdoron committed
292 293
}

294
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
295
{
296 297
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
298
        return -1;
299
    return s->bind (addr_);
300 301 302 303
}

int zmq_connect (void *s_, const char *addr_)
{
304 305
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
306
        return -1;
307
    return s->connect (addr_);
skaller's avatar
skaller committed
308 309
}

310
int zmq_unbind (void *s_, const char *addr_)
311
{
312 313
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
314
        return -1;
315
    return s->term_endpoint (addr_);
316 317
}

318
int zmq_disconnect (void *s_, const char *addr_)
319
{
320 321
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
322
        return -1;
323
    return s->term_endpoint (addr_);
324 325
}

326 327
// Sending functions.

328
static inline int
329
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
330
{
Min RK's avatar
Min RK committed
331
    size_t sz = zmq_msg_size (msg_);
332
    int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
skaller's avatar
skaller committed
333 334
    if (unlikely (rc < 0))
        return -1;
335 336 337 338 339 340

    //  This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
    //  int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
    size_t max_msgsz = INT_MAX;

    //  Truncate returned size to INT_MAX to avoid overflow to negative values
341
    return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
skaller's avatar
skaller committed
342 343
}

344
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
345 346
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
347
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
348 349
}

350
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
351
{
352 353
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
354
        return -1;
355
    zmq_msg_t msg;
356
    if (zmq_msg_init_size (&msg, len_))
357 358
        return -1;

359 360 361 362 363 364
    //  We explicitly allow a send from NULL, size zero
    if (len_) {
        assert (buf_);
        memcpy (zmq_msg_data (&msg), buf_, len_);
    }
    int rc = s_sendmsg (s, &msg, flags_);
365
    if (unlikely (rc < 0)) {
366 367 368 369 370 371 372 373
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
374
    return rc;
Uli Köhler's avatar
Uli Köhler committed
375 376
}

377
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
Uli Köhler's avatar
Uli Köhler committed
378
{
379 380
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
Uli Köhler's avatar
Uli Köhler committed
381 382
        return -1;
    zmq_msg_t msg;
383 384
    int rc =
      zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
385
    if (rc != 0)
Uli Köhler's avatar
Uli Köhler committed
386 387
        return -1;

388 389
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
Uli Köhler's avatar
Uli Köhler committed
390 391 392 393 394 395 396 397
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
398
    return rc;
399 400
}

401

skaller's avatar
skaller committed
402
// Send multiple messages.
403
// TODO: this function has no man page
skaller's avatar
skaller committed
404 405 406
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
407
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
408
//
409
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
410
{
411 412
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
413
        return -1;
414 415 416 417 418
    if (unlikely (count_ <= 0 || !a_)) {
        errno = EINVAL;
        return -1;
    }

skaller's avatar
skaller committed
419 420
    int rc = 0;
    zmq_msg_t msg;
421

422
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
423
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
424
        if (rc != 0) {
skaller's avatar
skaller committed
425 426 427 428
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
429 430
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
431 432
        rc = s_sendmsg (s, &msg, flags_);
        if (unlikely (rc < 0)) {
433 434 435 436 437 438
            int err = errno;
            int rc2 = zmq_msg_close (&msg);
            errno_assert (rc2 == 0);
            errno = err;
            rc = -1;
            break;
skaller's avatar
skaller committed
439 440
        }
    }
441
    return rc;
skaller's avatar
skaller committed
442 443
}

444 445
// Receiving functions.

446
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
447
{
448
    int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
skaller's avatar
skaller committed
449 450
    if (unlikely (rc < 0))
        return -1;
451 452

    //  Truncate returned size to INT_MAX to avoid overflow to negative values
Min RK's avatar
Min RK committed
453
    size_t sz = zmq_msg_size (msg_);
454
    return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
skaller's avatar
skaller committed
455 456
}

457
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
458 459
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
460
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
461 462 463
}


464
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
465
{
466 467
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
468
        return -1;
469 470 471 472
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

473
    int nbytes = s_recvmsg (s, &msg, flags_);
474
    if (unlikely (nbytes < 0)) {
475
        int err = errno;
476 477
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
478 479 480 481
        errno = err;
        return -1;
    }

482
    //  An oversized message is silently truncated.
483
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
484

485 486 487 488 489
    //  We explicitly allow a null buffer argument if len is zero
    if (to_copy) {
        assert (buf_);
        memcpy (buf_, zmq_msg_data (&msg), to_copy);
    }
490 491 492
    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

493
    return nbytes;
494 495
}

skaller's avatar
skaller committed
496
// Receive a multi-part message
497
//
skaller's avatar
skaller committed
498 499 500 501 502 503 504 505 506 507
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
508
// The iov_base* buffers of each iovec *a_ filled in by this
skaller's avatar
skaller committed
509
// function may be freed using free().
510
// TODO: this function has no man page
skaller's avatar
skaller committed
511
//
512
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
513
{
514 515
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
skaller's avatar
skaller committed
516
        return -1;
517 518 519 520 521
    if (unlikely (!count_ || *count_ <= 0 || !a_)) {
        errno = EINVAL;
        return -1;
    }

522
    size_t count = *count_;
skaller's avatar
skaller committed
523 524
    int nread = 0;
    bool recvmore = true;
525

526
    *count_ = 0;
skaller's avatar
skaller committed
527

528
    for (size_t i = 0; recvmore && i < count; ++i) {
skaller's avatar
skaller committed
529 530 531 532
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

533
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
534 535 536 537 538 539 540 541 542 543
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }

        a_[i].iov_len = zmq_msg_size (&msg);
544
        a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
545
        if (unlikely (!a_[i].iov_base)) {
546 547 548
            errno = ENOMEM;
            return -1;
        }
549 550
        memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
                a_[i].iov_len);
skaller's avatar
skaller committed
551
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
552
        const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
553 554
        recvmore = p_msg->flags () & zmq::msg_t::more;
        rc = zmq_msg_close (&msg);
555 556 557
        errno_assert (rc == 0);
        ++*count_;
        ++nread;
skaller's avatar
skaller committed
558
    }
559
    return nread;
skaller's avatar
skaller committed
560 561
}

562 563
// Message manipulators.

564 565
int zmq_msg_init (zmq_msg_t *msg_)
{
566
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
567 568 569 570
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
571
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
572 573
}

574 575
int zmq_msg_init_data (
  zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
576
{
577 578
    return (reinterpret_cast<zmq::msg_t *> (msg_))
      ->init_data (data_, size_, ffn_, hint_);
579 580
}

581
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
582
{
583 584
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
585
        return -1;
586
    return s_sendmsg (s, msg_, flags_);
587 588
}

589
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
590
{
591 592
    zmq::socket_base_t *s = as_socket_base_t (s_);
    if (!s)
593
        return -1;
594
    return s_recvmsg (s, msg_, flags_);
595 596
}

597 598
int zmq_msg_close (zmq_msg_t *msg_)
{
599
    return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
600 601 602 603
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
604 605
    return (reinterpret_cast<zmq::msg_t *> (dest_))
      ->move (*reinterpret_cast<zmq::msg_t *> (src_));
606 607 608 609
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
610 611
    return (reinterpret_cast<zmq::msg_t *> (dest_))
      ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
612 613 614 615
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
616
    return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
617 618
}

619
size_t zmq_msg_size (const zmq_msg_t *msg_)
620
{
621
    return ((zmq::msg_t *) msg_)->size ();
622 623
}

624
int zmq_msg_more (const zmq_msg_t *msg_)
625
{
626
    return zmq_msg_get (msg_, ZMQ_MORE);
627 628
}

629
int zmq_msg_get (const zmq_msg_t *msg_, int property_)
630
{
631
    const char *fd_string;
632

633
    switch (property_) {
634
        case ZMQ_MORE:
635
            return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
636
        case ZMQ_SRCFD:
637
            fd_string = zmq_msg_gets (msg_, "__fd");
638
            if (fd_string == NULL)
639
                return -1;
640

641
            return atoi (fd_string);
642
        case ZMQ_SHARED:
643 644 645 646
            return (((zmq::msg_t *) msg_)->is_cmsg ())
                       || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
                     ? 1
                     : 0;
647
        default:
648 649
            errno = EINVAL;
            return -1;
650 651 652
    }
}

653
int zmq_msg_set (zmq_msg_t *, int, int)
654
{
655
    //  No properties supported at present
656 657
    errno = EINVAL;
    return -1;
658 659
}

660 661
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
662 663
    return (reinterpret_cast<zmq::msg_t *> (msg_))
      ->set_routing_id (routing_id_);
664 665
}

666
uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
667
{
668
    return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
669
}
670

somdoron's avatar
somdoron committed
671 672
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
{
673
    return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
somdoron's avatar
somdoron committed
674 675 676 677
}

const char *zmq_msg_group (zmq_msg_t *msg_)
{
678
    return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
somdoron's avatar
somdoron committed
679 680
}

681 682
//  Get message metadata string

683
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
684
{
685 686
    const zmq::metadata_t *metadata =
      reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
687
    const char *value = NULL;
688
    if (metadata)
689 690 691
        value = metadata->get (std::string (property_));
    if (value)
        return value;
692 693 694

    errno = EINVAL;
    return NULL;
695 696
}

697
    // Polling.
698

699 700 701 702 703
#if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
    // implement zmq_poll on top of zmq_poller
    int rc;
704
    zmq_poller_event_t *events;
705
    zmq::socket_poller_t poller;
706
    events = new (std::nothrow) zmq_poller_event_t[nitems_];
707
    alloc_assert (events);
708

709
    bool repeat_items = false;
710 711
    //  Register sockets with poller
    for (int i = 0; i < nitems_; i++) {
712
        items_[i].revents = 0;
713 714 715

        bool modify = false;
        short e = items_[i].events;
716 717
        if (items_[i].socket) {
            //  Poll item is a 0MQ socket.
718 719 720 721 722 723 724 725 726
            for (int j = 0; j < i; ++j) {
                // Check for repeat entries
                if (items_[j].socket == items_[i].socket) {
                    repeat_items = true;
                    modify = true;
                    e |= items_[j].events;
                }
            }
            if (modify) {
727
                rc = zmq_poller_modify (&poller, items_[i].socket, e);
728
            } else {
729
                rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
730
            }
731
            if (rc < 0) {
732
                delete[] events;
733 734 735 736
                return rc;
            }
        } else {
            //  Poll item is a raw file descriptor.
737 738
            for (int j = 0; j < i; ++j) {
                // Check for repeat entries
739
                if (!items_[j].socket && items_[j].fd == items_[i].fd) {
740 741 742 743 744 745
                    repeat_items = true;
                    modify = true;
                    e |= items_[j].events;
                }
            }
            if (modify) {
746
                rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
747
            } else {
748
                rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
749
            }
750
            if (rc < 0) {
751
                delete[] events;
752 753 754 755 756 757
                return rc;
            }
        }
    }

    //  Wait for events
758
    rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
759
    if (rc < 0) {
760 761
        delete[] events;
        if (zmq_errno () == EAGAIN) {
Min RK's avatar
Min RK committed
762 763
            return 0;
        }
764 765 766
        return rc;
    }

767
    //  Transform poller events into zmq_pollitem events.
768
    //  items_ contains all items, while events only contains fired events.
769 770
    //  If no sockets are repeated (likely), the two are still co-ordered, so step through the items
    //  checking for matches only on the first event.
771 772 773 774 775
    //  If there are repeat items, they cannot be assumed to be co-ordered,
    //  so each pollitem must check fired events from the beginning.
    int j_start = 0, found_events = rc;
    for (int i = 0; i < nitems_; i++) {
        for (int j = j_start; j < found_events; ++j) {
776 777 778
            if ((items_[i].socket && items_[i].socket == events[j].socket)
                || (!(items_[i].socket || events[j].socket)
                    && items_[i].fd == events[j].fd)) {
779 780 781 782 783 784 785 786 787 788 789
                items_[i].revents = events[j].events & items_[i].events;
                if (!repeat_items) {
                    // no repeats, we can ignore events we've already seen
                    j_start++;
                }
                break;
            }
            if (!repeat_items) {
                // no repeats, never have to look at j > j_start
                break;
            }
790
        }
791 792 793
    }

    //  Cleanup
794
    delete[] events;
795 796 797 798
    return rc;
}
#endif // ZMQ_HAVE_POLLER

799 800
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
801
#if defined ZMQ_HAVE_POLLER
802 803 804 805 806 807 808 809 810
    // if poller is present, use that if there is at least 1 thread-safe socket,
    // otherwise fall back to the previous implementation as it's faster.
    for (int i = 0; i != nitems_; i++) {
        if (items_[i].socket
            && as_socket_base_t (items_[i].socket)->is_thread_safe ()) {
            return zmq_poller_poll (items_, nitems_, timeout_);
        }
    }
#endif // ZMQ_HAVE_POLLER
811
#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
812 813 814 815 816 817 818 819 820 821
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
        return 0;
822 823 824 825 826
#elif defined ZMQ_HAVE_VXWORKS
        struct timespec ns_;
        ns_.tv_sec = timeout_ / 1000;
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
        return nanosleep (&ns_, 0);
827 828 829 830 831 832 833 834 835
#else
        return usleep (timeout_ * 1000);
#endif
    }
    if (!items_) {
        errno = EFAULT;
        return -1;
    }

836 837 838 839
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
#if defined ZMQ_POLL_BASED_ON_POLL
840
    zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
841 842 843 844 845

    //  Build pollset for poll () system call.
    for (int i = 0; i != nitems_; i++) {
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
846
        if (items_[i].socket) {
847
            size_t zmq_fd_size = sizeof (zmq::fd_t);
848 849 850
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
                                &zmq_fd_size)
                == -1) {
851 852
                return -1;
            }
853
            pollfds[i].events = items_[i].events ? POLLIN : 0;
854 855 856 857
        }
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
        else {
858 859 860 861 862
            pollfds[i].fd = items_[i].fd;
            pollfds[i].events =
              (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
              | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
              | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
863 864
        }
    }
865 866 867
#else
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
868
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
869 870
    zmq_assert (nitems_ <= FD_SETSIZE);

871
    zmq::optimized_fd_set_t pollset_in (nitems_);
872
    FD_ZERO (pollset_in.get ());
873
    zmq::optimized_fd_set_t pollset_out (nitems_);
874
    FD_ZERO (pollset_out.get ());
875
    zmq::optimized_fd_set_t pollset_err (nitems_);
876
    FD_ZERO (pollset_err.get ());
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891

    zmq::fd_t maxfd = 0;

    //  Build the fd_sets for passing to select ().
    for (int i = 0; i != nitems_; i++) {
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
        if (items_[i].socket) {
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
                                &zmq_fd_size)
                == -1)
                return -1;
            if (items_[i].events) {
892
                FD_SET (notify_fd, pollset_in.get ());
893 894 895 896 897 898 899 900
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
        }
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_[i].events & ZMQ_POLLIN)
901
                FD_SET (items_[i].fd, pollset_in.get ());
902
            if (items_[i].events & ZMQ_POLLOUT)
903
                FD_SET (items_[i].fd, pollset_out.get ());
904
            if (items_[i].events & ZMQ_POLLERR)
905
                FD_SET (items_[i].fd, pollset_err.get ());
906 907 908 909 910
            if (maxfd < items_[i].fd)
                maxfd = items_[i].fd;
        }
    }

911 912 913
    zmq::optimized_fd_set_t inset (nitems_);
    zmq::optimized_fd_set_t outset (nitems_);
    zmq::optimized_fd_set_t errset (nitems_);
914
#endif
915 916 917 918 919

    bool first_pass = true;
    int nevents = 0;

    while (true) {
920 921
#if defined ZMQ_POLL_BASED_ON_POLL

922
        //  Compute the timeout for the subsequent poll.
923 924
        zmq::timeout_t timeout =
          zmq::compute_timeout (first_pass, timeout_, now, end);
925 926

        //  Wait for events.
927
        {
928
            int rc = poll (&pollfds[0], nitems_, timeout);
929 930 931 932 933 934 935
            if (rc == -1 && errno == EINTR) {
                return -1;
            }
            errno_assert (rc >= 0);
        }
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
936
            items_[i].revents = 0;
937 938 939

            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
940
            if (items_[i].socket) {
941 942
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
943 944 945
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
                                    &zmq_events_size)
                    == -1) {
946 947
                    return -1;
                }
948 949 950 951 952 953
                if ((items_[i].events & ZMQ_POLLOUT)
                    && (zmq_events & ZMQ_POLLOUT))
                    items_[i].revents |= ZMQ_POLLOUT;
                if ((items_[i].events & ZMQ_POLLIN)
                    && (zmq_events & ZMQ_POLLIN))
                    items_[i].revents |= ZMQ_POLLIN;
954 955 956 957
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
958 959 960 961 962 963 964 965
                if (pollfds[i].revents & POLLIN)
                    items_[i].revents |= ZMQ_POLLIN;
                if (pollfds[i].revents & POLLOUT)
                    items_[i].revents |= ZMQ_POLLOUT;
                if (pollfds[i].revents & POLLPRI)
                    items_[i].revents |= ZMQ_POLLPRI;
                if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
                    items_[i].revents |= ZMQ_POLLERR;
966 967
            }

968
            if (items_[i].revents)
969 970 971 972 973 974 975 976 977 978 979 980
                nevents++;
        }

#else

        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
981
        } else if (timeout_ < 0)
982 983
            ptimeout = NULL;
        else {
984 985
            timeout.tv_sec = static_cast<long> ((end - now) / 1000);
            timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
986 987 988 989 990
            ptimeout = &timeout;
        }

        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
991
            memcpy (inset.get (), pollset_in.get (),
992
                    zmq::valid_pollset_bytes (*pollset_in.get ()));
993
            memcpy (outset.get (), pollset_out.get (),
994
                    zmq::valid_pollset_bytes (*pollset_out.get ()));
995
            memcpy (errset.get (), pollset_err.get (),
996
                    zmq::valid_pollset_bytes (*pollset_err.get ()));
997
#if defined ZMQ_HAVE_WINDOWS
998 999
            int rc =
              select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1000 1001 1002 1003 1004 1005
            if (unlikely (rc == SOCKET_ERROR)) {
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
            }
#else
1006 1007
            int rc = select (maxfd + 1, inset.get (), outset.get (),
                             errset.get (), ptimeout);
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
            if (unlikely (rc == -1)) {
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
            }
#endif
            break;
        }

        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
1018
            items_[i].revents = 0;
1019 1020 1021

            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
1022
            if (items_[i].socket) {
1023 1024
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
1025 1026 1027
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
                                    &zmq_events_size)
                    == -1)
1028
                    return -1;
1029 1030 1031 1032 1033 1034
                if ((items_[i].events & ZMQ_POLLOUT)
                    && (zmq_events & ZMQ_POLLOUT))
                    items_[i].revents |= ZMQ_POLLOUT;
                if ((items_[i].events & ZMQ_POLLIN)
                    && (zmq_events & ZMQ_POLLIN))
                    items_[i].revents |= ZMQ_POLLIN;
1035 1036 1037 1038
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
1039
                if (FD_ISSET (items_[i].fd, inset.get ()))
1040
                    items_[i].revents |= ZMQ_POLLIN;
1041
                if (FD_ISSET (items_[i].fd, outset.get ()))
1042
                    items_[i].revents |= ZMQ_POLLOUT;
1043
                if (FD_ISSET (items_[i].fd, errset.get ()))
1044
                    items_[i].revents |= ZMQ_POLLERR;
1045 1046
            }

1047
            if (items_[i].revents)
1048 1049
                nevents++;
        }
1050
#endif
1051

1052
        //  If timeout is zero, exit immediately whether there are events or not.
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
        if (timeout_ == 0)
            break;

        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
            continue;
        }

        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
            end = now + timeout_;
            if (now == end)
                break;
            first_pass = false;
            continue;
        }

        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
1084
            break;
1085 1086 1087 1088 1089 1090 1091 1092
    }

    return nevents;
#else
    //  Exotic platforms that support neither poll() nor select().
    errno = ENOTSUP;
    return -1;
#endif
1093
}
1094

1095 1096
//  The poller functionality

1097
void *zmq_poller_new (void)
1098 1099
{
    zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1100 1101 1102
    if (!poller) {
        errno = ENOMEM;
    }
1103 1104 1105
    return poller;
}

1106
int zmq_poller_destroy (void **poller_p_)
1107
{
1108 1109 1110 1111 1112 1113 1114 1115
    if (poller_p_) {
        zmq::socket_poller_t *const poller =
          static_cast<zmq::socket_poller_t *> (*poller_p_);
        if (poller && poller->check_tag ()) {
            delete poller;
            *poller_p_ = NULL;
            return 0;
        }
1116
    }
1117 1118
    errno = EFAULT;
    return -1;
1119 1120
}

1121

1122
static int check_poller (void *const poller_)
1123
{
1124 1125
    if (!poller_
        || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1126 1127 1128 1129
        errno = EFAULT;
        return -1;
    }

1130 1131 1132
    return 0;
}

1133 1134 1135 1136 1137 1138 1139 1140 1141
static int check_events (const short events_)
{
    if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}

1142 1143 1144 1145 1146
static int check_poller_registration_args (void *const poller_, void *const s_)
{
    if (-1 == check_poller (poller_))
        return -1;

1147
    if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1148 1149 1150
        errno = ENOTSOCK;
        return -1;
    }
1151 1152 1153 1154 1155

    return 0;
}

static int check_poller_fd_registration_args (void *const poller_,
1156
                                              const zmq::fd_t fd_)
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
{
    if (-1 == check_poller (poller_))
        return -1;

    if (fd_ == zmq::retired_fd) {
        errno = EBADF;
        return -1;
    }

    return 0;
}

int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{
1171 1172
    if (-1 == check_poller_registration_args (poller_, s_)
        || -1 == check_events (events_))
1173 1174
        return -1;

1175
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1176

1177
    return (static_cast<zmq::socket_poller_t *> (poller_))
1178
      ->add (socket, user_data_, events_);
1179 1180
}

1181
int zmq_poller_add_fd (void *poller_,
1182
                       zmq::fd_t fd_,
1183 1184
                       void *user_data_,
                       short events_)
1185
{
1186 1187
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
        || -1 == check_events (events_))
1188
        return -1;
1189

1190
    return (static_cast<zmq::socket_poller_t *> (poller_))
1191
      ->add_fd (fd_, user_data_, events_);
1192 1193
}

1194

1195
int zmq_poller_modify (void *poller_, void *s_, short events_)
1196
{
1197 1198
    if (-1 == check_poller_registration_args (poller_, s_)
        || -1 == check_events (events_))
1199 1200
        return -1;

1201
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1202

1203 1204
    return (static_cast<zmq::socket_poller_t *> (poller_))
      ->modify (socket, events_);
1205 1206
}

1207
int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1208
{
1209 1210
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
        || -1 == check_events (events_))
1211
        return -1;
1212

1213 1214
    return (static_cast<zmq::socket_poller_t *> (poller_))
      ->modify_fd (fd_, events_);
1215 1216
}

1217
int zmq_poller_remove (void *poller_, void *s_)
1218
{
1219
    if (-1 == check_poller_registration_args (poller_, s_))
1220 1221
        return -1;

1222
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1223

1224
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1225 1226
}

1227
int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1228
{
1229
    if (-1 == check_poller_fd_registration_args (poller_, fd_))
1230
        return -1;
1231

1232
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1233
}
somdoron's avatar
somdoron committed
1234

1235
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1236
{
1237
    int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1238

1239 1240 1241
    if (rc < 0 && event_) {
        // TODO this is not portable... zmq_poller_event_t contains pointers,
        // for which nullptr does not need to be represented by all-zeroes
1242
        memset (event_, 0, sizeof (zmq_poller_event_t));
1243
    }
1244 1245
    // wait_all returns number of events, but we return 0 for any success
    return rc >= 0 ? 0 : rc;
1246 1247
}

1248 1249
int zmq_poller_wait_all (void *poller_,
                         zmq_poller_event_t *events_,
1250
                         int n_events_,
1251
                         long timeout_)
1252
{
1253
    if (-1 == check_poller (poller_))
1254
        return -1;
1255

1256 1257 1258 1259
    if (!events_) {
        errno = EFAULT;
        return -1;
    }
1260
    if (n_events_ < 0) {
1261 1262 1263
        errno = EINVAL;
        return -1;
    }
1264

1265
    int rc =
1266 1267
      (static_cast<zmq::socket_poller_t *> (poller_))
        ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1268
                n_events_, timeout_);
1269 1270 1271 1272

    return rc;
}

1273 1274
//  Peer-specific state

1275
int zmq_socket_get_peer_state (void *s_,
1276 1277
                               const void *routing_id_,
                               size_t routing_id_size_)
1278
{
1279
    const zmq::socket_base_t *const s = as_socket_base_t (s_);
1280 1281 1282
    if (!s)
        return -1;

1283
    return s->get_peer_state (routing_id_, routing_id_size_);
1284 1285
}

somdoron's avatar
somdoron committed
1286 1287
//  Timers

1288
void *zmq_timers_new (void)
somdoron's avatar
somdoron committed
1289 1290 1291 1292 1293 1294
{
    zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
    alloc_assert (timers);
    return timers;
}

1295
int zmq_timers_destroy (void **timers_p_)
somdoron's avatar
somdoron committed
1296
{
1297
    void *timers = *timers_p_;
1298
    if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
somdoron's avatar
somdoron committed
1299 1300 1301
        errno = EFAULT;
        return -1;
    }
1302
    delete (static_cast<zmq::timers_t *> (timers));
1303
    *timers_p_ = NULL;
somdoron's avatar
somdoron committed
1304 1305 1306
    return 0;
}

1307 1308 1309 1310
int zmq_timers_add (void *timers_,
                    size_t interval_,
                    zmq_timer_fn handler_,
                    void *arg_)
somdoron's avatar
somdoron committed
1311
{
1312
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1313 1314 1315 1316
        errno = EFAULT;
        return -1;
    }

1317 1318
    return (static_cast<zmq::timers_t *> (timers_))
      ->add (interval_, handler_, arg_);
somdoron's avatar
somdoron committed
1319 1320 1321 1322
}

int zmq_timers_cancel (void *timers_, int timer_id_)
{
1323
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1324 1325 1326 1327
        errno = EFAULT;
        return -1;
    }

1328
    return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
somdoron's avatar
somdoron committed
1329 1330 1331 1332
}

int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
{
1333
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1334 1335 1336 1337
        errno = EFAULT;
        return -1;
    }

1338 1339
    return (static_cast<zmq::timers_t *> (timers_))
      ->set_interval (timer_id_, interval_);
somdoron's avatar
somdoron committed
1340 1341 1342 1343
}

int zmq_timers_reset (void *timers_, int timer_id_)
{
1344
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1345 1346 1347 1348
        errno = EFAULT;
        return -1;
    }

1349
    return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
somdoron's avatar
somdoron committed
1350 1351 1352 1353
}

long zmq_timers_timeout (void *timers_)
{
1354
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1355 1356 1357 1358
        errno = EFAULT;
        return -1;
    }

1359
    return (static_cast<zmq::timers_t *> (timers_))->timeout ();
somdoron's avatar
somdoron committed
1360 1361 1362 1363
}

int zmq_timers_execute (void *timers_)
{
1364
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
somdoron's avatar
somdoron committed
1365 1366 1367 1368
        errno = EFAULT;
        return -1;
    }

1369
    return (static_cast<zmq::timers_t *> (timers_))->execute ();
somdoron's avatar
somdoron committed
1370 1371
}

1372 1373
//  The proxy functionality

1374 1375
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
1376 1377 1378 1379
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1380 1381 1382
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_),
                       static_cast<zmq::socket_base_t *> (capture_));
1383 1384
}

1385 1386 1387 1388
int zmq_proxy_steerable (void *frontend_,
                         void *backend_,
                         void *capture_,
                         void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
1389
{
1390 1391 1392 1393
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1394 1395 1396 1397
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_),
                       static_cast<zmq::socket_base_t *> (capture_),
                       static_cast<zmq::socket_base_t *> (control_));
1398
}
Pieter Hintjens's avatar
Pieter Hintjens committed
1399

1400
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
1401

1402
int zmq_device (int /* type */, void *frontend_, void *backend_)
1403
{
1404 1405
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
                       static_cast<zmq::socket_base_t *> (backend_), NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
1406
}
1407 1408 1409

//  Probe library capabilities; for now, reports on transport and security

1410
int zmq_has (const char *capability_)
1411
{
1412
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
1413
    if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1414 1415
        return true;
#endif
1416
#if defined(ZMQ_HAVE_OPENPGM)
1417
    if (strcmp (capability_, "pgm") == 0)
1418 1419
        return true;
#endif
1420
#if defined(ZMQ_HAVE_TIPC)
1421
    if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1422 1423
        return true;
#endif
1424
#if defined(ZMQ_HAVE_NORM)
1425
    if (strcmp (capability_, "norm") == 0)
1426 1427
        return true;
#endif
1428
#if defined(ZMQ_HAVE_CURVE)
1429
    if (strcmp (capability_, "curve") == 0)
1430 1431
        return true;
#endif
1432
#if defined(HAVE_LIBGSSAPI_KRB5)
1433
    if (strcmp (capability_, "gssapi") == 0)
1434
        return true;
Ilya Kulakov's avatar
Ilya Kulakov committed
1435
#endif
1436
#if defined(ZMQ_HAVE_VMCI)
1437
    if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
Ilya Kulakov's avatar
Ilya Kulakov committed
1438
        return true;
1439
#endif
1440
#if defined(ZMQ_BUILD_DRAFT_API)
1441
    if (strcmp (capability_, "draft") == 0)
1442
        return true;
1443 1444 1445 1446
#endif
    //  Whatever the application asked for, we don't have
    return false;
}