zmq.cpp 28.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
19
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
20

21
#include "poller.hpp"
22 23 24 25 26 27

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
28 29 30
#include <poll.h>
#endif

AJ Lewis's avatar
AJ Lewis committed
31 32 33
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"

34
#if defined ZMQ_HAVE_WINDOWS
35 36 37 38 39
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
40

skaller's avatar
skaller committed
41
// XSI vector I/O
42
#if defined ZMQ_HAVE_UIO
skaller's avatar
skaller committed
43 44
#include <sys/uio.h>
#else
45
struct iovec {
skaller's avatar
skaller committed
46 47 48 49 50 51
    void *iov_base;
    size_t iov_len;
};
#endif


52
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
53 54 55
#include <stdlib.h>
#include <new>

56
#include "proxy.hpp"
57
#include "socket_base.hpp"
58
#include "stdint.hpp"
59
#include "config.hpp"
60
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
61
#include "clock.hpp"
62
#include "ctx.hpp"
63
#include "err.hpp"
64
#include "msg.hpp"
65
#include "fd.hpp"
66

67 68 69
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
70

71
#if defined ZMQ_HAVE_OPENPGM
72
#define __PGM_WININT_H__
73 74 75
#include <pgm/pgm.h>
#endif

76 77 78 79
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

80

81 82
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
83 84 85
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
86 87
}

88

89 90
const char *zmq_strerror (int errnum_)
{
91
    return zmq::errno_to_string (errnum_);
92 93
}

94
int zmq_errno (void)
95 96 97 98 99
{
    return errno;
}


100
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
101

102 103
void *zmq_ctx_new (void)
{
104 105 106 107 108 109
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
110
    pgm_error_t *pgm_error = NULL;
111 112
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
113 114 115 116 117 118 119

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
120
            pgm_error_free (pgm_error);
121 122 123
            errno = EINVAL;
            return NULL;
        }
124 125

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
126 127 128 129
        zmq_assert (false);
    }
#endif

130 131 132 133 134 135 136 137 138 139 140 141 142
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

143
    //  Create 0MQ context.
144
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
145
    alloc_assert (ctx);
146 147 148
    return ctx;
}

149
int zmq_ctx_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
150
{
151
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
152 153 154
        errno = EFAULT;
        return -1;
    }
155

156 157 158
    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

159 160
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
161
#ifdef ZMQ_HAVE_WINDOWS
162 163 164
        //  On Windows, uninitialise socket layer.
        rc = WSACleanup ();
        wsa_assert (rc != SOCKET_ERROR);
165 166
#endif

167
#if defined ZMQ_HAVE_OPENPGM
168 169 170
        //  Shut down the OpenPGM library.
        if (pgm_shutdown () != TRUE)
            zmq_assert (false);
171
#endif
172
    }
173 174 175

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
176 177
}

178 179 180 181 182 183 184 185 186 187
int zmq_ctx_shutdown (void *ctx_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }

    return ((zmq::ctx_t*) ctx_)->shutdown ();
}

188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}

int zmq_ctx_get (void *ctx_, int option_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->get (option_);
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
210 211 212 213 214 215 216
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
    return NULL;   
217 218 219 220
}

int zmq_term (void *ctx_)
{
221 222 223 224 225 226
    return zmq_ctx_term (ctx_);
}

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
227 228 229 230
}


// Sockets
231

232
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
233
{
234
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
235 236 237
        errno = EFAULT;
        return NULL;
    }
238 239
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
240
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
241 242
}

Martin Sustrik's avatar
Martin Sustrik committed
243
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
244
{
245 246
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
247 248
        return -1;
    }
249
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
250 251 252
    return 0;
}

253 254
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
255
{
256 257
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
258 259
        return -1;
    }
skaller's avatar
skaller committed
260 261 262
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->setsockopt (option_, optval_, optvallen_);
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
263 264
}

265 266
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
267 268
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
269 270
        return -1;
    }
skaller's avatar
skaller committed
271 272 273
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->getsockopt (option_, optval_, optvallen_);
    return result;
274 275
}

276 277 278 279 280 281 282 283 284 285 286
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->monitor (addr_, events_);
    return result;
}

287
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
288
{
289 290
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
291 292
        return -1;
    }
skaller's avatar
skaller committed
293 294 295
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->bind (addr_);
    return result;
296 297 298 299
}

int zmq_connect (void *s_, const char *addr_)
{
300 301
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
302 303
        return -1;
    }
skaller's avatar
skaller committed
304 305 306 307 308
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

309
int zmq_unbind (void *s_, const char *addr_)
310 311 312 313 314 315
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
316
    return s->term_endpoint (addr_);
317 318
}

319
int zmq_disconnect (void *s_, const char *addr_)
320 321 322 323 324 325
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
326
    return s->term_endpoint (addr_);
327 328
}

329 330
// Sending functions.

331 332
static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
333 334 335 336 337 338 339 340
{
    int sz = (int) zmq_msg_size (msg_);
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return sz;
}

341
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
342 343
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
344
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
345 346
}

347 348
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
349 350 351 352
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
353 354 355 356 357 358
    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
359
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
360
    rc = s_sendmsg (s, &msg, flags_);
361 362 363 364 365 366 367 368 369 370
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
Uli Köhler's avatar
Uli Köhler committed
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
    return rc;
}

int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq_msg_t msg;
    int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL);
    if (rc != 0)
        return -1;

    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
397 398 399
    return rc;
}

400

skaller's avatar
skaller committed
401
// Send multiple messages.
402
// TODO: this function has no man page
skaller's avatar
skaller committed
403 404 405
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
406
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
407
//
408
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
409 410 411 412 413 414 415 416
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
417 418
    
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
419
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
420
        if (rc != 0) {
skaller's avatar
skaller committed
421 422 423 424
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
425 426 427
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
        rc = s_sendmsg (s, &msg, flags_);
skaller's avatar
skaller committed
428 429 430 431 432 433 434 435 436 437 438 439
        if (unlikely (rc < 0)) {
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
    return rc; 
}

440 441
// Receiving functions.

442 443
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
444
{
445
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
skaller's avatar
skaller committed
446 447 448 449 450
    if (unlikely (rc < 0))
        return -1;
    return (int) zmq_msg_size (msg_);
}

451
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
452 453
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
454
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
455 456 457
}


458 459
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
460 461 462 463
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
464 465 466 467
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
468
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
469
    int nbytes = s_recvmsg (s, &msg, flags_);
470
    if (unlikely (nbytes < 0)) {
471
        int err = errno;
472 473
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
474 475 476 477 478 479
        errno = err;
        return -1;
    }

    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
480
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
481
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
482 483 484 485

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

486
    return nbytes;
487 488
}

skaller's avatar
skaller committed
489 490 491 492 493 494 495 496 497 498 499 500 501 502
// Receive a multi-part message
// 
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this 
// function may be freed using free().
503
// TODO: this function has no man page
skaller's avatar
skaller committed
504
//
505
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
506 507 508 509 510 511 512
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;

513
    size_t count = *count_;
skaller's avatar
skaller committed
514 515
    int nread = 0;
    bool recvmore = true;
516 517
    
    *count_ = 0;
skaller's avatar
skaller committed
518

519
    for (size_t i = 0; recvmore && i < count; ++i) {
520
       
skaller's avatar
skaller committed
521 522 523 524
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

525
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
526 527 528 529 530 531 532 533 534 535
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }

        a_[i].iov_len = zmq_msg_size (&msg);
536
        a_[i].iov_base = malloc(a_[i].iov_len);
537
        if (unlikely (!a_[i].iov_base)) {
538 539 540 541 542
            errno = ENOMEM;
            return -1;
        }
        memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
               a_[i].iov_len);
skaller's avatar
skaller committed
543
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
544
        recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
545 546 547 548
        rc = zmq_msg_close(&msg);
        errno_assert (rc == 0);
        ++*count_;
        ++nread;
skaller's avatar
skaller committed
549
    }
550
    return nread;
skaller's avatar
skaller committed
551 552
}

553 554
// Message manipulators.

555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

571 572 573 574 575 576 577
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
578
    int result = s_sendmsg (s, msg_, flags_);
579 580 581 582 583 584 585 586 587 588
    return result;
}

int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
589
    int result = s_recvmsg (s, msg_, flags_);
590 591 592
    return result;
}

593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

613
size_t zmq_msg_size (zmq_msg_t *msg_)
614 615 616 617
{
    return ((zmq::msg_t*) msg_)->size ();
}

618 619
int zmq_msg_more (zmq_msg_t *msg_)
{
620
    return zmq_msg_get (msg_, ZMQ_MORE);
621 622
}

623
int zmq_msg_get (zmq_msg_t *msg_, int property_)
624
{
625
    switch (property_) {
626
        case ZMQ_MORE:
627
            return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
628 629
        case ZMQ_SRCFD:
            return ((zmq::msg_t*) msg_)->fd ();
630
        default:
631 632
            errno = EINVAL;
            return -1;
633 634 635
    }
}

636
int zmq_msg_set (zmq_msg_t *, int, int)
637
{
638
    //  No properties supported at present
639 640
    errno = EINVAL;
    return -1;
641 642
}

643 644 645 646 647 648 649 650 651

//  Get message metadata string

char *zmq_msg_gets (zmq_msg_t *msg_, char *property_)
{
    //  All unknown properties return NULL
    return NULL;
}

652 653
// Polling.

654 655 656
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
657 658 659 660 661 662 663 664
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
665
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
666
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
667 668 669
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
670
#else
671
        return usleep (timeout_ * 1000);
672 673
#endif
    }
674 675 676 677 678 679

    if (!items_) {
        errno = EFAULT;
        return -1;
    }

680 681 682
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
683 684
    pollfd spollfds[ZMQ_POLLITEMS_DFLT];
    pollfd *pollfds = spollfds;
685

686 687 688 689
    if (nitems_ > ZMQ_POLLITEMS_DFLT) {
        pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
        alloc_assert (pollfds);
    }
690

691
    //  Build pollset for poll () system call.
692 693
    for (int i = 0; i != nitems_; i++) {

694 695
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
696
        if (items_ [i].socket) {
697
            size_t zmq_fd_size = sizeof (zmq::fd_t);
698 699
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
700 701
                if (pollfds != spollfds)
                    free (pollfds);
702
                return -1;
703
            }
704
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
705
        }
706 707
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
708
        else {
709 710 711 712
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
713
        }
714 715
    }

716
    bool first_pass = true;
717
    int nevents = 0;
718

719
    while (true) {
720 721 722 723 724 725 726 727 728
        //  Compute the timeout for the subsequent poll.
        int timeout;
        if (first_pass)
            timeout = 0;
        else
        if (timeout_ < 0)
            timeout = -1;
        else
            timeout = end - now;
729

730
        //  Wait for events.
731
        while (true) {
732
            int rc = poll (pollfds, nitems_, timeout);
733
            if (rc == -1 && errno == EINTR) {
734 735
                if (pollfds != spollfds)
                    free (pollfds);
736
                return -1;
737
            }
738 739
            errno_assert (rc >= 0);
            break;
740
        }
741 742
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
743

744
            items_ [i].revents = 0;
745

746 747
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
748
            if (items_ [i].socket) {
749 750 751 752
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
753 754
                    if (pollfds != spollfds)
                        free (pollfds);
755 756 757 758 759 760 761 762
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
763
            }
764 765 766 767 768 769 770 771 772 773 774 775 776
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
777
        }
778

779 780 781
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
782

783 784 785 786 787 788
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
789 790 791
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
792
            continue;
793
        }
794

795 796 797 798 799 800
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
801
            end = now + timeout_;
802 803 804
            if (now == end)
                break;
            first_pass = false;
805 806
            continue;
        }
807

808 809 810 811
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
812 813
    }

814 815
    if (pollfds != spollfds)
        free (pollfds);
816 817
    return nevents;

818
#elif defined ZMQ_POLL_BASED_ON_SELECT
819

820 821 822 823 824 825 826 827
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
828
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
829 830
        return 0;
#else
831
        return usleep (timeout_ * 1000);
832 833
#endif
    }
834 835 836 837 838 839 840 841
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

842 843 844 845 846 847 848
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

849
    zmq::fd_t maxfd = 0;
850

851
    //  Build the fd_sets for passing to select ().
852 853
    for (int i = 0; i != nitems_; i++) {

854 855
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
856
        if (items_ [i].socket) {
857 858 859 860 861
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
862 863 864 865 866
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
867
        }
868 869 870 871 872 873 874 875 876 877 878
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
879 880 881
        }
    }

882
    bool first_pass = true;
883
    int nevents = 0;
884
    fd_set inset, outset, errset;
885 886

    while (true) {
887

888 889 890 891 892 893 894 895
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
896 897
        else
        if (timeout_ < 0)
898 899 900 901 902 903 904
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

905 906 907 908 909
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
910
#if defined ZMQ_HAVE_WINDOWS
911
            int rc = select (0, &inset, &outset, &errset, ptimeout);
912
            if (unlikely (rc == SOCKET_ERROR)) {
913 914 915
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
916
            }
917
#else
918
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
919
            if (unlikely (rc == -1)) {
920 921
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
922
            }
923
#endif
924 925
            break;
        }
926

927 928
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
929

930
            items_ [i].revents = 0;
931

932 933 934
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
935 936 937 938
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
939
                    return -1;
940 941 942 943 944 945
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
946 947 948 949 950
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
951
                    items_ [i].revents |= ZMQ_POLLIN;
952 953 954 955
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
956
            }
957 958 959

            if (items_ [i].revents)
                nevents++;
960
        }
961

962 963 964
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
965

966 967 968 969 970 971
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
972 973 974
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
975
            continue;
976
        }
977

978 979 980 981 982 983
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
984
            end = now + timeout_;
985 986 987
            if (now == end)
                break;
            first_pass = false;
988 989
            continue;
        }
990

991 992 993 994
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
995 996 997 998 999
    }

    return nevents;

#else
1000
    //  Exotic platforms that support neither poll() nor select().
1001 1002
    errno = ENOTSUP;
    return -1;
1003 1004 1005
#endif
}

1006 1007
//  The proxy functionality

1008 1009
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
1010 1011 1012 1013
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1014
    return zmq::proxy (
1015 1016
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
1017 1018 1019 1020
        (zmq::socket_base_t*) capture_);
}

int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
1021
{
1022 1023 1024 1025
    if (!frontend_ || !backend_) {
        errno = EFAULT;
        return -1;
    }
1026
    return zmq::proxy (
1027 1028
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
1029
        (zmq::socket_base_t*) capture_,
1030 1031
        (zmq::socket_base_t*) control_);
}
Pieter Hintjens's avatar
Pieter Hintjens committed
1032

1033
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
1034

1035
int zmq_device (int /* type */, void *frontend_, void *backend_)
1036 1037
{
    return zmq::proxy (
1038
        (zmq::socket_base_t*) frontend_,
1039
        (zmq::socket_base_t*) backend_, NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
1040
}