zmq.cpp 25.4 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
21 22
#define ZMQ_TYPE_UNSAFE
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
23

24 25
#include "platform.hpp"

26 27 28 29 30
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
31 32 33 34
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
35 36
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
37
     defined ZMQ_HAVE_CYGWIN
38 39 40 41 42 43 44 45
#define ZMQ_POLL_BASED_ON_SELECT
#endif

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
46 47 48
#include <poll.h>
#endif

49
#if defined ZMQ_HAVE_WINDOWS
50 51 52 53 54
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
55

skaller's avatar
skaller committed
56 57 58 59
// XSI vector I/O
#if ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
60
struct iovec {
skaller's avatar
skaller committed
61 62 63 64 65 66
    void *iov_base;
    size_t iov_len;
};
#endif


67
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
68 69 70 71
#include <errno.h>
#include <stdlib.h>
#include <new>

72
#include "socket_base.hpp"
73
#include "stdint.hpp"
74
#include "config.hpp"
75
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
76
#include "clock.hpp"
77
#include "ctx.hpp"
78
#include "err.hpp"
79
#include "msg.hpp"
80
#include "fd.hpp"
81

82 83 84
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
85

86
#if defined ZMQ_HAVE_OPENPGM
87
#define __PGM_WININT_H__
88 89 90
#include <pgm/pgm.h>
#endif

91 92 93 94
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

95 96
// Version.

97 98
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
99 100 101
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
102 103
}

104 105
// Errors.

106 107
const char *zmq_strerror (int errnum_)
{
108
    return zmq::errno_to_string (errnum_);
109 110
}

111 112 113 114 115 116 117
int zmq_errno ()
{
    return errno;
}

// Contexts.

118
static zmq::ctx_t *s_init (int io_threads_)
Martin Sustrik's avatar
Martin Sustrik committed
119
{
120 121 122 123
    if (io_threads_ < 0) {
        errno = EINVAL;
        return NULL;
    }
Martin Sustrik's avatar
Martin Sustrik committed
124

125 126 127 128 129 130
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
131
    pgm_error_t *pgm_error = NULL;
132 133
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
134 135 136 137 138 139 140

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
141
            pgm_error_free (pgm_error);
142 143 144
            errno = EINVAL;
            return NULL;
        }
145 146

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
147 148 149 150
        zmq_assert (false);
    }
#endif

151 152 153 154 155 156 157 158 159 160 161 162 163
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

164
    //  Create 0MQ context.
165
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
166
    alloc_assert (ctx);
167 168 169 170 171
    return ctx;
}

void *zmq_init (int io_threads_)
{
172
    return (void *) s_init (io_threads_);
173 174
}

175
int zmq_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
176
{
177
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
178 179 180
        errno = EFAULT;
        return -1;
    }
181 182 183
    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

184 185 186 187 188 189
#ifdef ZMQ_HAVE_WINDOWS
    //  On Windows, uninitialise socket layer.
    rc = WSACleanup ();
    wsa_assert (rc != SOCKET_ERROR);
#endif

190 191 192 193 194 195 196 197
#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
198 199
}

200 201
// Sockets.

202
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
203
{
204
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
205 206 207
        errno = EFAULT;
        return NULL;
    }
208 209
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
210
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
211 212
}

Martin Sustrik's avatar
Martin Sustrik committed
213
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
214
{
215 216
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
217 218
        return -1;
    }
219
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
220 221 222
    return 0;
}

223 224
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
225
{
226 227
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
228 229
        return -1;
    }
skaller's avatar
skaller committed
230 231 232
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->setsockopt (option_, optval_, optvallen_);
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
}

235 236
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
237 238
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
239 240
        return -1;
    }
skaller's avatar
skaller committed
241 242 243
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->getsockopt (option_, optval_, optvallen_);
    return result;
244 245
}

246
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
247
{
248 249
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
250 251
        return -1;
    }
skaller's avatar
skaller committed
252 253 254
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->bind (addr_);
    return result;
255 256 257 258
}

int zmq_connect (void *s_, const char *addr_)
{
259 260
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
261 262
        return -1;
    }
skaller's avatar
skaller committed
263 264 265 266 267
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

268 269
// Sending functions.

270 271
static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
272 273 274 275 276 277 278 279
{
    int sz = (int) zmq_msg_size (msg_);
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return sz;
}

280
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
281 282
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
283
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
284 285
}

286 287
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
288 289 290 291
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
292 293 294 295 296 297
    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
298
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
299
    rc = s_sendmsg (s, &msg, flags_);
300 301 302 303 304 305 306 307 308 309 310 311 312
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
    return rc;
}

skaller's avatar
skaller committed
313 314 315 316
// Send multiple messages.
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
317
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
318
//
319
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
320 321 322 323 324 325 326 327
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
328 329
    
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
330
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
331
        if (rc != 0) {
skaller's avatar
skaller committed
332 333 334 335
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
336 337 338
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
        rc = s_sendmsg (s, &msg, flags_);
skaller's avatar
skaller committed
339 340 341 342 343 344 345 346 347 348 349 350
        if (unlikely (rc < 0)) {
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
    return rc; 
}

351 352
// Receiving functions.

353 354
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
355
{
356
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
skaller's avatar
skaller committed
357 358 359 360 361
    if (unlikely (rc < 0))
        return -1;
    return (int) zmq_msg_size (msg_);
}

362
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
363 364
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
365
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
366 367 368
}


369 370
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
371 372 373 374
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
375 376 377 378
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
379
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
380
    int nbytes = s_recvmsg (s, &msg, flags_);
381
    if (unlikely (nbytes < 0)) {
382
        int err = errno;
383 384
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
385 386 387 388 389 390
        errno = err;
        return -1;
    }

    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
391
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
392
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
393 394 395 396

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

397
    return nbytes;
398 399
}

skaller's avatar
skaller committed
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
// Receive a multi-part message
// 
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this 
// function may be freed using free().
//
// Implementation note: We assume zmq::msg_t buffer allocated
// by zmq::recvmsg can be freed by free().
// We assume it is safe to steal these buffers by simply
// not closing the zmq::msg_t.
//
420
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
421 422 423 424 425 426 427
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;

428
    size_t count = (int) *count_;
skaller's avatar
skaller committed
429 430 431
    int nread = 0;
    bool recvmore = true;

432
    for (size_t i = 0; recvmore && i < count; ++i) {
skaller's avatar
skaller committed
433 434 435 436 437 438
        // Cheat! We never close any msg
        // because we want to steal the buffer.
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

439
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
440 441 442 443 444 445 446 447
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }
448
        ++*count_;
skaller's avatar
skaller committed
449 450 451 452 453 454 455
        ++nread;

        // Cheat: acquire zmq_msg buffer.
        a_[i].iov_base = zmq_msg_data (&msg);
        a_[i].iov_len = zmq_msg_size (&msg);

        // Assume zmq_socket ZMQ_RVCMORE is properly set.
456
        recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
skaller's avatar
skaller committed
457
    }
458
    return nread;
skaller's avatar
skaller committed
459 460
}

461 462
// Message manipulators.

463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

479 480 481 482 483 484 485
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
486
    int result = s_sendmsg (s, msg_, flags_);
487 488 489 490 491 492 493 494 495 496
    return result;
}

int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
497
    int result = s_recvmsg (s, msg_, flags_);
498 499 500
    return result;
}

501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

521
size_t zmq_msg_size (zmq_msg_t *msg_)
522 523 524 525
{
    return ((zmq::msg_t*) msg_)->size ();
}

526 527 528 529
int zmq_msg_more (zmq_msg_t *msg_)
{
    int more;
    size_t more_size = sizeof (more);
530
    int rc = zmq_msg_get (msg_, ZMQ_MORE, &more, &more_size);
531 532 533 534
    assert (rc == 0);
    return more;
}

535
int zmq_msg_get (zmq_msg_t *msg_, int option_, void *optval_,
536 537 538
    size_t *optvallen_)
{
    switch (option_) {
539 540 541 542 543 544 545 546 547 548
        case ZMQ_MORE:
            if (*optvallen_ < sizeof (int)) {
                errno = EINVAL;
                return -1;
            }
            *((int*) optval_) =
                (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1 : 0;
            *optvallen_ = sizeof (int);
            return 0;
        default:
549 550
            errno = EINVAL;
            return -1;
551 552 553
    }
}

554
int zmq_msg_set (zmq_msg_t *msg_, int option_, const void *optval_,
555 556 557 558 559
    size_t *optvallen_)
{
    //  No options supported at present
    errno = EINVAL;
    return -1;
560 561
}

562 563
// Polling.

564 565
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
566 567 568 569
    if (!items_) {
        errno = EFAULT;
        return -1;
    }
570
#if defined ZMQ_POLL_BASED_ON_POLL
571 572 573 574 575 576 577 578
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
579
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
580
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
581 582 583
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
584
#else
585
        return usleep (timeout_ * 1000);
586 587
#endif
    }
588 589 590 591
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

592
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
593
    alloc_assert (pollfds);
594

595
    //  Build pollset for poll () system call.
596 597
    for (int i = 0; i != nitems_; i++) {

598 599
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
600
        if (items_ [i].socket) {
601
            size_t zmq_fd_size = sizeof (zmq::fd_t);
602 603 604 605
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
                free (pollfds);
                return -1;
606
            }
607
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
608
        }
609 610
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
611
        else {
612 613 614 615
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
616
        }
617 618
    }

619
    bool first_pass = true;
620
    int nevents = 0;
621

622
    while (true) {
623

624 625 626 627 628 629 630 631 632
         //  Compute the timeout for the subsequent poll.
         int timeout;
         if (first_pass)
             timeout = 0;
         else if (timeout_ < 0)
             timeout = -1;
         else
             timeout = end - now;

633
        //  Wait for events.
634
        while (true) {
635
            int rc = poll (pollfds, nitems_, timeout);
636
            if (rc == -1 && errno == EINTR) {
637 638
                free (pollfds);
                return -1;
639
            }
640 641
            errno_assert (rc >= 0);
            break;
642
        }
643

644 645
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
646

647
            items_ [i].revents = 0;
648

649 650
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
651
            if (items_ [i].socket) {
652 653 654 655 656 657 658 659 660 661 662 663 664
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
                    free (pollfds);
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
665
            }
666 667 668 669 670 671 672 673 674 675 676 677 678
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
679
        }
680

681 682 683
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
684

685 686 687 688 689 690
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
691 692 693
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
694
            continue;
695
        }
696

697 698 699 700 701 702
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
703
            end = now + timeout_;
704 705 706
            if (now == end)
                break;
            first_pass = false;
707 708
            continue;
        }
709

710 711 712 713
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
714 715 716 717 718
    }

    free (pollfds);
    return nevents;

719
#elif defined ZMQ_POLL_BASED_ON_SELECT
720

721 722 723 724 725 726 727 728
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
729
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
730 731
        return 0;
#else
732
        return usleep (timeout_ * 1000);
733 734
#endif
    }
735 736 737 738 739 740 741 742
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

743 744 745 746 747 748 749
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

750
    zmq::fd_t maxfd = 0;
751

752
    //  Build the fd_sets for passing to select ().
753 754
    for (int i = 0; i != nitems_; i++) {

755 756
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
757
        if (items_ [i].socket) {
758 759 760 761 762
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
763 764 765 766 767
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
768
        }
769 770 771 772 773 774 775 776 777 778 779
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
780 781 782
        }
    }

783
    bool first_pass = true;
784
    int nevents = 0;
785
    fd_set inset, outset, errset;
786 787

    while (true) {
788

789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
        else if (timeout_ < 0)
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

805 806 807 808 809
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
810
#if defined ZMQ_HAVE_WINDOWS
811
            int rc = select (0, &inset, &outset, &errset, ptimeout);
812
            if (unlikely (rc == SOCKET_ERROR)) {
813
                zmq::wsa_error_to_errno ();
814 815 816 817
                if (errno == ENOTSOCK)
                    return -1;
                wsa_assert (false);
            }
818
#else
819
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
820
            if (unlikely (rc == -1)) {
821 822 823 824
                if (errno == EINTR || errno == EBADF)
                    return -1;
                errno_assert (false);
            }
825
#endif
826 827
            break;
        }
828

829 830
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
831

832
            items_ [i].revents = 0;
833

834 835 836
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
837 838 839 840
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
841
                    return -1;
842 843 844 845 846 847
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
848 849 850 851 852
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
853
                    items_ [i].revents |= ZMQ_POLLIN;
854 855 856 857
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
858
            }
859 860 861

            if (items_ [i].revents)
                nevents++;
862
        }
863

864 865 866
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
867

868 869 870 871 872 873
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
874 875 876
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
877
            continue;
878
        }
879

880 881 882 883 884 885
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
886
            end = now + timeout_;
887 888 889
            if (now == end)
                break;
            first_pass = false;
890 891
            continue;
        }
892

893 894 895 896
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
897 898 899 900 901
    }

    return nevents;

#else
902
    //  Exotic platforms that support neither poll() nor select().
903 904
    errno = ENOTSUP;
    return -1;
905 906 907
#endif
}

908 909 910 911 912 913
#if defined ZMQ_POLL_BASED_ON_SELECT
#undef ZMQ_POLL_BASED_ON_SELECT
#endif
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif