zmq.cpp 25.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3 4
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23
#include "platform.hpp"

24 25 26 27 28
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
29 30 31 32
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
33 34 35 36 37 38 39 40 41 42 43
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
	defined ZMQ_HAVE_CYGWIN
#define ZMQ_POLL_BASED_ON_SELECT
#endif

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
44 45 46
#include <poll.h>
#endif

47
#if defined ZMQ_HAVE_WINDOWS
48 49 50 51 52
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
53

skaller's avatar
skaller committed
54 55 56 57 58 59 60 61 62 63 64 65
// XSI vector I/O
#if ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct iovec 
{
    void *iov_base;
    size_t iov_len;
};
#endif


66
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
67 68 69 70
#include <errno.h>
#include <stdlib.h>
#include <new>

71
#include "socket_base.hpp"
72
#include "stdint.hpp"
73
#include "config.hpp"
74
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
75
#include "clock.hpp"
76
#include "ctx.hpp"
77
#include "err.hpp"
78
#include "msg.hpp"
79
#include "fd.hpp"
80

81 82 83
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
84

85
#if defined ZMQ_HAVE_OPENPGM
86
#define __PGM_WININT_H__
87 88 89
#include <pgm/pgm.h>
#endif

90 91 92 93
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

94 95
// Version.

96 97
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
98 99 100
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
101 102
}

103 104
// Errors.

105 106
const char *zmq_strerror (int errnum_)
{
107
    return zmq::errno_to_string (errnum_);
108 109
}

110 111 112 113 114 115 116
int zmq_errno ()
{
    return errno;
}

// Contexts.

117
static zmq::ctx_t *inner_init (int io_threads_)
Martin Sustrik's avatar
Martin Sustrik committed
118
{
119 120 121 122
    if (io_threads_ < 0) {
        errno = EINVAL;
        return NULL;
    }
Martin Sustrik's avatar
Martin Sustrik committed
123

124 125 126 127 128 129
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
130
    pgm_error_t *pgm_error = NULL;
131 132
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
133 134 135 136 137 138 139

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
140
            pgm_error_free (pgm_error);
141 142 143
            errno = EINVAL;
            return NULL;
        }
144 145

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
146 147 148 149
        zmq_assert (false);
    }
#endif

150 151 152 153 154 155 156 157 158 159 160 161 162
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

163
    //  Create 0MQ context.
164
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
165
    alloc_assert (ctx);
166 167 168 169 170 171 172 173 174 175 176 177 178
    return ctx;
}

void *zmq_init (int io_threads_)
{
    return (void*) inner_init (io_threads_);
}

void *zmq_init_thread_safe (int io_threads_)
{
  zmq::ctx_t *ctx = inner_init (io_threads_);
  ctx->set_thread_safe();
  return (void*) ctx;
Martin Sustrik's avatar
Martin Sustrik committed
179 180
}

181
int zmq_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
182
{
183
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
184 185 186
        errno = EFAULT;
        return -1;
    }
187 188 189 190

    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

191 192 193 194 195 196
#ifdef ZMQ_HAVE_WINDOWS
    //  On Windows, uninitialise socket layer.
    rc = WSACleanup ();
    wsa_assert (rc != SOCKET_ERROR);
#endif

197 198 199 200 201 202 203 204
#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
205 206
}

207 208
// Sockets.

209
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
210
{
211
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
212 213 214
        errno = EFAULT;
        return NULL;
    }
215 216 217 218
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
    if (ctx->get_thread_safe ()) s->set_thread_safe ();
    return (void*) s;
Martin Sustrik's avatar
Martin Sustrik committed
219 220
}

Martin Sustrik's avatar
Martin Sustrik committed
221
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
222
{
223 224
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
225 226
        return -1;
    }
227
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
228 229 230
    return 0;
}

231 232
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
233
{
234 235
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
236 237
        return -1;
    }
skaller's avatar
skaller committed
238 239 240 241 242
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = s->setsockopt (option_, optval_, optvallen_);
    if(s->thread_safe()) s->unlock();
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
243 244
}

245 246
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
247 248
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
249 250
        return -1;
    }
skaller's avatar
skaller committed
251 252 253 254 255
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = s->getsockopt (option_, optval_, optvallen_);
    if(s->thread_safe()) s->unlock();
    return result;
256 257
}

258
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
259
{
260 261
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
262 263
        return -1;
    }
skaller's avatar
skaller committed
264 265 266 267 268
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = s->bind (addr_);
    if(s->thread_safe()) s->unlock();
    return result;
269 270 271 272
}

int zmq_connect (void *s_, const char *addr_)
{
273 274
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
275 276
        return -1;
    }
skaller's avatar
skaller committed
277 278 279 280 281 282 283
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = s->connect (addr_);
    if(s->thread_safe()) s->unlock();
    return result;
}

284 285
// Sending functions.

skaller's avatar
skaller committed
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
    int sz = (int) zmq_msg_size (msg_);
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return sz;
}

int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = inner_sendmsg (s, msg_, flags_);
    if(s->thread_safe()) s->unlock();
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
306 307
}

308 309
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
310 311 312 313
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
314 315 316 317 318 319
    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
320 321 322 323
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    rc = inner_sendmsg (s, &msg, flags_);
    if(s->thread_safe()) s->unlock();
324 325 326 327 328 329 330 331 332 333 334 335 336
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
    return rc;
}

skaller's avatar
skaller committed
337 338 339 340
// Send multiple messages.
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
341
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
//
int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    for(size_t i = 0; i < count_; ++i)
    {
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
        if (rc != 0)
        {
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);

        if (i == count_ - 1) flags_ = flags_ & ~ZMQ_SNDMORE;
        rc = inner_sendmsg (s, &msg, flags_);
        if (unlikely (rc < 0)) {
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
    if(s->thread_safe()) s->unlock();
    return rc; 
}

378 379
// Receiving functions.

skaller's avatar
skaller committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return (int) zmq_msg_size (msg_);
}

int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int result = inner_recvmsg(s, msg_, flags_);
    if(s->thread_safe()) s->unlock();
    return result;
}


402 403
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
404 405 406 407
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
408 409 410 411
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
412 413 414 415
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();
    int nbytes = inner_recvmsg (s, &msg, flags_);
    if(s->thread_safe()) s->unlock();
416
    if (unlikely (nbytes < 0)) {
417
        int err = errno;
418 419
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
420 421 422 423 424 425
        errno = err;
        return -1;
    }

    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
426
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
427
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
428 429 430 431

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

432
    return nbytes;    
433 434
}

skaller's avatar
skaller committed
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
// Receive a multi-part message
// 
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this 
// function may be freed using free().
//
// Implementation note: We assume zmq::msg_t buffer allocated
// by zmq::recvmsg can be freed by free().
// We assume it is safe to steal these buffers by simply
// not closing the zmq::msg_t.
//
int zmq_recvmmsg (void *s_, iovec *a_, size_t *count_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    if(s->thread_safe()) s->lock();

    size_t count = (int)*count_;
    int nread = 0;
    bool recvmore = true;

    for(size_t i = 0; recvmore && i < count; ++i)
    {
        // Cheat! We never close any msg
        // because we want to steal the buffer.
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

        int nbytes = inner_recvmsg (s, &msg, flags_);
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }
        ++*count_;
        ++nread;

        // Cheat: acquire zmq_msg buffer.
        a_[i].iov_base = zmq_msg_data (&msg);
        a_[i].iov_len = zmq_msg_size (&msg);

        // Assume zmq_socket ZMQ_RVCMORE is properly set.
        recvmore =((zmq::msg_t*) (void*) &msg)->flags () & zmq::msg_t::more;
    }
    if(s->thread_safe()) s->unlock();
    return nread;    
}

499 500
// Message manipulators.

501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

size_t zmq_msg_size (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->size ();
}

542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
    size_t *optvallen_)
{
    switch (option_) {
    case ZMQ_MORE:
        if (*optvallen_ < sizeof (int)) {
            errno = EINVAL;
            return -1;
        }
        *((int*) optval_) =
            (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
        *optvallen_ = sizeof (int);
        return 0;
    default:
        errno = EINVAL;
        return -1;
    }
}

561 562
// Polling.

563 564 565
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
566 567 568 569 570 571 572 573
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
574
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
575
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
576 577 578
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
579
#else
580
        return usleep (timeout_ * 1000);
581 582
#endif
    }
583

584 585 586 587
    if (!items_) {
        errno = EFAULT;
        return -1;
    }
588

589 590 591 592
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

593
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
594
    alloc_assert (pollfds);
595

596
    //  Build pollset for poll () system call.
597 598
    for (int i = 0; i != nitems_; i++) {

599 600
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
601
        if (items_ [i].socket) {
602
            size_t zmq_fd_size = sizeof (zmq::fd_t);
603 604 605 606
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
                free (pollfds);
                return -1;
607
            }
608
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
609
        }
610 611
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
612
        else {
613 614 615 616
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
617
        }
618 619
    }

620
    bool first_pass = true;
621
    int nevents = 0;
622

623
    while (true) {
624

625 626 627 628 629 630 631 632 633
         //  Compute the timeout for the subsequent poll.
         int timeout;
         if (first_pass)
             timeout = 0;
         else if (timeout_ < 0)
             timeout = -1;
         else
             timeout = end - now;

634
        //  Wait for events.
635
        while (true) {
636
            int rc = poll (pollfds, nitems_, timeout);
637
            if (rc == -1 && errno == EINTR) {
638 639
                free (pollfds);
                return -1;
640
            }
641 642
            errno_assert (rc >= 0);
            break;
643
        }
644

645 646
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
647

648
            items_ [i].revents = 0;
649

650 651
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
652
            if (items_ [i].socket) {
653 654 655 656 657 658 659 660 661 662 663 664 665
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
                    free (pollfds);
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
666
            }
667 668 669 670 671 672 673 674 675 676 677 678 679
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
680
        }
681

682 683 684
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
685

686 687 688 689 690 691
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
692 693 694
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
695
            continue;
696
        }
697

698 699 700 701 702 703
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
704
            end = now + timeout_;
705 706 707
            if (now == end)
                break;
            first_pass = false;
708 709
            continue;
        }
710

711 712 713 714
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
715 716 717 718 719
    }

    free (pollfds);
    return nevents;

720
#elif defined ZMQ_POLL_BASED_ON_SELECT
721

722 723 724 725 726 727 728 729
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
730
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
731 732
        return 0;
#else
733
        return usleep (timeout_ * 1000);
734 735 736
#endif
    }

737 738 739 740 741 742 743 744 745 746 747 748 749
    if (!items_) {
        errno = EFAULT;
        return -1;
    }

    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

750 751 752 753 754 755 756
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

757
    zmq::fd_t maxfd = 0;
758

759
    //  Build the fd_sets for passing to select ().
760 761
    for (int i = 0; i != nitems_; i++) {

762 763
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
764
        if (items_ [i].socket) {
765 766 767 768 769
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
770 771 772 773 774
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
775
        }
776 777 778 779 780 781 782 783 784 785 786
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
787 788 789
        }
    }

790
    bool first_pass = true;
791
    int nevents = 0;
792
    fd_set inset, outset, errset;
793 794

    while (true) {
795

796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
        else if (timeout_ < 0)
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

812 813 814 815 816
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
817
#if defined ZMQ_HAVE_WINDOWS
818
            int rc = select (0, &inset, &outset, &errset, ptimeout);
819
            if (unlikely (rc == SOCKET_ERROR)) {
820
                zmq::wsa_error_to_errno ();
821 822 823 824
                if (errno == ENOTSOCK)
                    return -1;
                wsa_assert (false);
            }
825
#else
826
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
827
            if (unlikely (rc == -1)) {
828 829 830 831
                if (errno == EINTR || errno == EBADF)
                    return -1;
                errno_assert (false);
            }
832
#endif
833 834
            break;
        }
835

836 837
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
838

839
            items_ [i].revents = 0;
840

841 842 843
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
844 845 846 847
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
848
                    return -1;
849 850 851 852 853 854
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
855 856 857 858 859
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
860
                    items_ [i].revents |= ZMQ_POLLIN;
861 862 863 864
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
865
            }
866 867 868

            if (items_ [i].revents)
                nevents++;
869
        }
870

871 872 873
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
874

875 876 877 878 879 880
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
881 882 883
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
884
            continue;
885
        }
886

887 888 889 890 891 892
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
893
            end = now + timeout_;
894 895 896
            if (now == end)
                break;
            first_pass = false;
897 898
            continue;
        }
899

900 901 902 903
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
904 905 906 907 908
    }

    return nevents;

#else
909
    //  Exotic platforms that support neither poll() nor select().
910 911
    errno = ENOTSUP;
    return -1;
912 913 914
#endif
}

915 916 917 918 919 920 921
#if defined ZMQ_POLL_BASED_ON_SELECT
#undef ZMQ_POLL_BASED_ON_SELECT
#endif
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif