zmq.cpp 30.4 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
29
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
30

31
#include "poller.hpp"
32 33 34 35 36 37

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
38 39 40
#include <poll.h>
#endif

AJ Lewis's avatar
AJ Lewis committed
41 42 43
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"

44
#if defined ZMQ_HAVE_WINDOWS
45 46 47 48 49
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
50

skaller's avatar
skaller committed
51
// XSI vector I/O
52
#if defined ZMQ_HAVE_UIO
skaller's avatar
skaller committed
53 54
#include <sys/uio.h>
#else
55
struct iovec {
skaller's avatar
skaller committed
56 57 58 59 60 61
    void *iov_base;
    size_t iov_len;
};
#endif


62
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
63 64 65
#include <stdlib.h>
#include <new>

66
#include "proxy.hpp"
67
#include "socket_base.hpp"
68
#include "stdint.hpp"
69
#include "config.hpp"
70
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
71
#include "clock.hpp"
72
#include "ctx.hpp"
73
#include "err.hpp"
74
#include "msg.hpp"
75
#include "fd.hpp"
76
#include "metadata.hpp"
77

78 79 80
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
81

82
#if defined ZMQ_HAVE_OPENPGM
83
#define __PGM_WININT_H__
84 85 86
#include <pgm/pgm.h>
#endif

87 88 89 90
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

91

92 93
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
94 95 96
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
97 98
}

99

100 101
const char *zmq_strerror (int errnum_)
{
102
    return zmq::errno_to_string (errnum_);
103 104
}

105
int zmq_errno (void)
106 107 108 109 110
{
    return errno;
}


111
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
112

113 114
void *zmq_ctx_new (void)
{
115 116 117 118 119 120
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
121
    pgm_error_t *pgm_error = NULL;
122 123
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
124 125 126 127 128 129 130

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
131
            pgm_error_free (pgm_error);
132 133 134
            errno = EINVAL;
            return NULL;
        }
135 136

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
137 138 139 140
        zmq_assert (false);
    }
#endif

141 142 143 144 145 146 147 148 149 150 151 152 153
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

154
    //  Create 0MQ context.
155
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
156
    alloc_assert (ctx);
157 158 159
    return ctx;
}

160
int zmq_ctx_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
161
{
162
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
163 164 165
        errno = EFAULT;
        return -1;
    }
166

167 168 169
    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

170 171
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
172
#ifdef ZMQ_HAVE_WINDOWS
173 174 175
        //  On Windows, uninitialise socket layer.
        rc = WSACleanup ();
        wsa_assert (rc != SOCKET_ERROR);
176 177
#endif

178
#if defined ZMQ_HAVE_OPENPGM
179 180 181
        //  Shut down the OpenPGM library.
        if (pgm_shutdown () != TRUE)
            zmq_assert (false);
182
#endif
183
    }
184 185 186

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
187 188
}

189 190 191 192 193 194 195 196 197 198
int zmq_ctx_shutdown (void *ctx_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }

    return ((zmq::ctx_t*) ctx_)->shutdown ();
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}

int zmq_ctx_get (void *ctx_, int option_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->get (option_);
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
221 222 223 224 225 226
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
227
    return NULL;
228 229 230 231
}

int zmq_term (void *ctx_)
{
232 233 234 235 236 237
    return zmq_ctx_term (ctx_);
}

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
238 239 240 241
}


// Sockets
242

243
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
244
{
245
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
246 247 248
        errno = EFAULT;
        return NULL;
    }
249 250
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
251
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
252 253
}

Martin Sustrik's avatar
Martin Sustrik committed
254
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
255
{
256 257
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
258 259
        return -1;
    }
260
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
261 262 263
    return 0;
}

264 265
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
266
{
267 268
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
269 270
        return -1;
    }
skaller's avatar
skaller committed
271 272 273
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->setsockopt (option_, optval_, optvallen_);
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
274 275
}

276 277
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
278 279
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
280 281
        return -1;
    }
skaller's avatar
skaller committed
282 283 284
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->getsockopt (option_, optval_, optvallen_);
    return result;
285 286
}

287 288 289 290 291 292 293 294 295 296 297
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->monitor (addr_, events_);
    return result;
}

298
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
299
{
300 301
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
302 303
        return -1;
    }
skaller's avatar
skaller committed
304 305 306
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->bind (addr_);
    return result;
307 308 309 310
}

int zmq_connect (void *s_, const char *addr_)
{
311 312
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
313 314
        return -1;
    }
skaller's avatar
skaller committed
315 316 317 318 319
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

320
int zmq_unbind (void *s_, const char *addr_)
321 322 323 324 325 326
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
327
    return s->term_endpoint (addr_);
328 329
}

330
int zmq_disconnect (void *s_, const char *addr_)
331 332 333 334 335 336
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
337
    return s->term_endpoint (addr_);
338 339
}

340 341
// Sending functions.

342
static int
343
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
344
{
345
    int sz = (int) zmq_msg_size (msg_);
skaller's avatar
skaller committed
346 347 348
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
349
    return sz;
skaller's avatar
skaller committed
350 351
}

352
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
353 354
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
355
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
356 357
}

358
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
359
{
skaller's avatar
skaller committed
360 361 362 363
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
364
    zmq_msg_t msg;
365 366
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
367 368 369
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
370
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
371 372
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
373 374 375 376 377 378
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
379

380 381
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
382
    return rc;
Uli Köhler's avatar
Uli Köhler committed
383 384
}

385
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
Uli Köhler's avatar
Uli Köhler committed
386 387 388 389 390 391
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq_msg_t msg;
392 393
    int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL);
    if (rc != 0)
Uli Köhler's avatar
Uli Köhler committed
394 395 396
        return -1;

    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
397 398
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
Uli Köhler's avatar
Uli Köhler committed
399 400 401 402 403 404
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
405

Uli Köhler's avatar
Uli Köhler committed
406 407
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
408
    return rc;
409 410
}

411

skaller's avatar
skaller committed
412
// Send multiple messages.
413
// TODO: this function has no man page
skaller's avatar
skaller committed
414 415 416
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
417
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
418
//
419
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
420 421 422 423 424 425 426 427
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
428

429
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
430
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
431
        if (rc != 0) {
skaller's avatar
skaller committed
432 433 434 435
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
436 437
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
438 439
        rc = s_sendmsg (s, &msg, flags_);
        if (unlikely (rc < 0)) {
skaller's avatar
skaller committed
440 441 442 443 444 445 446 447
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
448
    return rc;
skaller's avatar
skaller committed
449 450
}

451 452
// Receiving functions.

453
static int
454
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
455
{
456
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
skaller's avatar
skaller committed
457 458
    if (unlikely (rc < 0))
        return -1;
459
    return (int) zmq_msg_size (msg_);
skaller's avatar
skaller committed
460 461
}

462
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
463 464
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
465
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
466 467 468
}


469
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
470
{
skaller's avatar
skaller committed
471 472 473 474
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
475 476 477 478
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
479
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
480
    int nbytes = s_recvmsg (s, &msg, flags_);
481
    if (unlikely (nbytes < 0)) {
482
        int err = errno;
483 484
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
485 486 487 488
        errno = err;
        return -1;
    }

489 490 491
    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
492
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
493 494 495 496

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

497
    return nbytes;
498 499
}

skaller's avatar
skaller committed
500
// Receive a multi-part message
501
//
skaller's avatar
skaller committed
502 503 504 505 506 507 508 509 510 511
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
512
// The iov_base* buffers of each iovec *a_ filled in by this
skaller's avatar
skaller committed
513
// function may be freed using free().
514
// TODO: this function has no man page
skaller's avatar
skaller committed
515
//
516
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
517 518 519 520 521 522 523
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;

524
    size_t count = *count_;
skaller's avatar
skaller committed
525 526
    int nread = 0;
    bool recvmore = true;
527

528
    *count_ = 0;
skaller's avatar
skaller committed
529

530
    for (size_t i = 0; recvmore && i < count; ++i) {
531

skaller's avatar
skaller committed
532 533 534 535
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

536
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
537 538 539 540 541 542 543 544 545 546
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }

        a_[i].iov_len = zmq_msg_size (&msg);
547
        a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
548
        if (unlikely (!a_[i].iov_base)) {
549 550 551 552 553
            errno = ENOMEM;
            return -1;
        }
        memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
               a_[i].iov_len);
skaller's avatar
skaller committed
554
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
555
        recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
556 557 558 559
        rc = zmq_msg_close(&msg);
        errno_assert (rc == 0);
        ++*count_;
        ++nread;
skaller's avatar
skaller committed
560
    }
561
    return nread;
skaller's avatar
skaller committed
562 563
}

564 565
// Message manipulators.

566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

582
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
583 584 585 586 587 588
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
589 590
    int result = s_sendmsg (s, msg_, flags_);
    return result;
591 592
}

593
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
594 595 596 597 598 599
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
600 601
    int result = s_recvmsg (s, msg_, flags_);
    return result;
602 603
}

604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

624
size_t zmq_msg_size (zmq_msg_t *msg_)
625 626 627 628
{
    return ((zmq::msg_t*) msg_)->size ();
}

629 630
int zmq_msg_more (zmq_msg_t *msg_)
{
631
    return zmq_msg_get (msg_, ZMQ_MORE);
632 633
}

634
int zmq_msg_get (zmq_msg_t *msg_, int property_)
635
{
636
    switch (property_) {
637
        case ZMQ_MORE:
638
            return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
639
        case ZMQ_SRCFD:
evoskuil's avatar
evoskuil committed
640
            return (int)((zmq::msg_t*) msg_)->fd ();
641
        case ZMQ_SHARED:
642 643
            return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
                   (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
644
        default:
645 646
            errno = EINVAL;
            return -1;
647 648 649
    }
}

650
int zmq_msg_set (zmq_msg_t *, int, int)
651
{
652
    //  No properties supported at present
653 654
    errno = EINVAL;
    return -1;
655 656
}

657 658 659 660 661 662 663 664 665
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
    return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_);
}

uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->get_routing_id();
}
666 667 668

//  Get message metadata string

669
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
670
{
671
    zmq::metadata_t *metadata = ((zmq::msg_t*) msg_)->metadata ();
672
    const char *value = NULL;
673
    if (metadata)
674 675 676 677 678
        value = metadata->get (std::string (property_));
    if (value)
        return value;
    else {
        errno = EINVAL;
679
        return NULL;
680
    }
681 682
}

683 684
// Polling.

685 686 687
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
688 689 690 691 692 693 694 695
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
696
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
697
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
698 699 700
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
701
#else
702
        return usleep (timeout_ * 1000);
703 704
#endif
    }
705 706 707 708 709 710

    if (!items_) {
        errno = EFAULT;
        return -1;
    }

711 712 713
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
714 715
    pollfd spollfds[ZMQ_POLLITEMS_DFLT];
    pollfd *pollfds = spollfds;
716

717 718 719 720
    if (nitems_ > ZMQ_POLLITEMS_DFLT) {
        pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
        alloc_assert (pollfds);
    }
721

722
    //  Build pollset for poll () system call.
723 724
    for (int i = 0; i != nitems_; i++) {

725 726
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
727
        if (items_ [i].socket) {
728
            size_t zmq_fd_size = sizeof (zmq::fd_t);
729 730
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
731 732
                if (pollfds != spollfds)
                    free (pollfds);
733
                return -1;
734
            }
735
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
736
        }
737 738
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
739
        else {
740 741 742
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
743 744
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
                (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
Martin Lucina's avatar
Martin Lucina committed
745
        }
746 747
    }

748
    bool first_pass = true;
749
    int nevents = 0;
750

751
    while (true) {
752 753 754 755 756 757 758 759 760
        //  Compute the timeout for the subsequent poll.
        int timeout;
        if (first_pass)
            timeout = 0;
        else
        if (timeout_ < 0)
            timeout = -1;
        else
            timeout = end - now;
761

762
        //  Wait for events.
763
        while (true) {
764
            int rc = poll (pollfds, nitems_, timeout);
765
            if (rc == -1 && errno == EINTR) {
766 767
                if (pollfds != spollfds)
                    free (pollfds);
768
                return -1;
769
            }
770 771
            errno_assert (rc >= 0);
            break;
772
        }
773 774
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
775

776
            items_ [i].revents = 0;
777

778 779
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
780
            if (items_ [i].socket) {
781 782 783 784
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
785 786
                    if (pollfds != spollfds)
                        free (pollfds);
787 788 789 790 791 792 793 794
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
795
            }
796 797 798 799 800 801 802
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
803 804 805
                if (pollfds [i].revents & POLLPRI)
                   items_ [i].revents |= ZMQ_POLLPRI;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
806 807 808 809 810
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
811
        }
812

813 814 815
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
816

817 818 819 820 821 822
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
823 824 825
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
826
            continue;
827
        }
828

829 830 831 832 833 834
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
835
            end = now + timeout_;
836 837 838
            if (now == end)
                break;
            first_pass = false;
839 840
            continue;
        }
841

842 843 844 845
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
846 847
    }

848 849
    if (pollfds != spollfds)
        free (pollfds);
850 851
    return nevents;

852
#elif defined ZMQ_POLL_BASED_ON_SELECT
853

854 855 856 857 858 859 860 861
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
862
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
863 864
        return 0;
#else
865
        return usleep (timeout_ * 1000);
866 867
#endif
    }
868 869 870 871 872 873 874 875
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

876 877 878 879 880 881 882
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

883
    zmq::fd_t maxfd = 0;
884

885
    //  Build the fd_sets for passing to select ().
886 887
    for (int i = 0; i != nitems_; i++) {

888 889
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
890
        if (items_ [i].socket) {
891 892 893 894 895
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
896 897 898 899 900
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
901
        }
902 903 904 905 906 907 908 909 910 911 912
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
913 914 915
        }
    }

916
    bool first_pass = true;
917
    int nevents = 0;
918
    fd_set inset, outset, errset;
919 920

    while (true) {
921

922 923 924 925 926 927 928 929
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
930 931
        else
        if (timeout_ < 0)
932 933 934 935 936 937 938
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

939 940 941 942 943
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
944
#if defined ZMQ_HAVE_WINDOWS
945
            int rc = select (0, &inset, &outset, &errset, ptimeout);
946
            if (unlikely (rc == SOCKET_ERROR)) {
947 948 949
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
950
            }
951
#else
952
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
953
            if (unlikely (rc == -1)) {
954 955
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
956
            }
957
#endif
958 959
            break;
        }
960

961 962
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
963

964
            items_ [i].revents = 0;
965

966 967 968
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
969 970 971 972
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
973
                    return -1;
974 975 976 977 978 979
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
980 981 982 983 984
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
985
                    items_ [i].revents |= ZMQ_POLLIN;
986 987 988 989
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
990
            }
991 992 993

            if (items_ [i].revents)
                nevents++;
994
        }
995

996 997 998
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
999

1000 1001 1002 1003 1004 1005
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
1006 1007 1008
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
1009
            continue;
1010
        }
1011

1012 1013 1014 1015 1016 1017
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
1018
            end = now + timeout_;
1019 1020 1021
            if (now == end)
                break;
            first_pass = false;
1022 1023
            continue;
        }
1024

1025 1026 1027 1028
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
1029 1030 1031 1032 1033
    }

    return nevents;

#else
1034
    //  Exotic platforms that support neither poll() nor select().
1035 1036
    errno = ENOTSUP;
    return -1;
1037 1038 1039
#endif
}

1040 1041
//  The proxy functionality

1042 1043
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
1044 1045 1046 1047
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1048
    return zmq::proxy (
1049 1050
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
1051 1052 1053 1054
        (zmq::socket_base_t*) capture_);
}

int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
1055
{
1056 1057 1058 1059
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1060
    return zmq::proxy (
1061 1062
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
1063
        (zmq::socket_base_t*) capture_,
1064 1065
        (zmq::socket_base_t*) control_);
}
Pieter Hintjens's avatar
Pieter Hintjens committed
1066

1067
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
1068

1069
int zmq_device (int /* type */, void *frontend_, void *backend_)
1070 1071
{
    return zmq::proxy (
1072
        (zmq::socket_base_t*) frontend_,
1073
        (zmq::socket_base_t*) backend_, NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
1074
}
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106

//  Probe library capabilities; for now, reports on transport and security

int zmq_has (const char *capability)
{
#if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
    if (strcmp (capability, "ipc") == 0)
        return true;
#endif
#if defined (ZMQ_HAVE_OPENPGM)
    if (strcmp (capability, "pgm") == 0)
        return true;
#endif
#if defined (ZMQ_HAVE_TIPC)
    if (strcmp (capability, "tipc") == 0)
        return true;
#endif
#if defined (ZMQ_HAVE_NORM)
    if (strcmp (capability, "norm") == 0)
        return true;
#endif
#if defined (HAVE_LIBSODIUM)
    if (strcmp (capability, "curve") == 0)
        return true;
#endif
#if defined (HAVE_LIBGSSAPI_KRB5)
    if (strcmp (capability, "gssapi") == 0)
        return true;
#endif
    //  Whatever the application asked for, we don't have
    return false;
}