zmq.cpp 21.9 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
16

17
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
18 19 20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

21 22 23 24 25 26 27 28 29 30 31 32 33 34
#include "platform.hpp"

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
#include <poll.h>
#endif

35
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "../include/zmq_utils.h"
Martin Sustrik's avatar
Martin Sustrik committed
37

38
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
39 40 41 42
#include <errno.h>
#include <stdlib.h>
#include <new>

43
#include "device.hpp"
44
#include "socket_base.hpp"
45
#include "msg_content.hpp"
46
#include "stdint.hpp"
47
#include "config.hpp"
48
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
49
#include "clock.hpp"
50
#include "ctx.hpp"
51
#include "err.hpp"
52
#include "fd.hpp"
53

54 55 56
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
57

58
#if defined ZMQ_HAVE_OPENPGM
59
#define __PGM_WININT_H__
60
#include <pgm/pgm.h>
61 62 63 64 65 66

//  TODO: OpenPGM redefines bool -- remove this once OpenPGM is fixed.
#if defined bool
#undef bool
#endif

67 68
#endif

69 70
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
71 72 73
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
74 75
}

76 77
const char *zmq_strerror (int errnum_)
{
78
    return zmq::errno_to_string (errnum_);
79 80
}

81
int zmq_msg_init (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
82
{
83
    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
84
    msg_->flags = 0;
Martin Sustrik's avatar
Martin Sustrik committed
85 86 87 88
    msg_->vsm_size = 0;
    return 0;
}

89
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
90
{
Martin Sustrik's avatar
Martin Sustrik committed
91
    if (size_ <= ZMQ_MAX_VSM_SIZE) {
92
        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
93
        msg_->flags = 0;
94
        msg_->vsm_size = (uint8_t) size_;
Martin Sustrik's avatar
Martin Sustrik committed
95 96
    }
    else {
97 98
        msg_->content =
            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
Martin Sustrik's avatar
Martin Sustrik committed
99 100 101 102
        if (!msg_->content) {
            errno = ENOMEM;
            return -1;
        }
103 104
        msg_->flags = 0;
        
105 106 107 108
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        content->data = (void*) (content + 1);
        content->size = size_;
        content->ffn = NULL;
109
        content->hint = NULL;
110
        new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
111 112 113 114
    }
    return 0;
}

115
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
116
    zmq_free_fn *ffn_, void *hint_)
Martin Sustrik's avatar
Martin Sustrik committed
117
{
118
    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
119
    alloc_assert (msg_->content);
120
    msg_->flags = 0;
121 122 123 124
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
    content->data = data_;
    content->size = size_;
    content->ffn = ffn_;
125
    content->hint = hint_;
126
    new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
127 128 129
    return 0;
}

130
int zmq_msg_close (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
131
{
132
    //  For VSMs and delimiters there are no resources to free.
133 134
    if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
Martin Sustrik's avatar
Martin Sustrik committed
135

136 137 138 139
        //  If the content is not shared, or if it is shared and the reference.
        //  count has dropped to zero, deallocate it.
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
Martin Sustrik's avatar
Martin Sustrik committed
140

141 142 143
            //  We used "placement new" operator to initialize the reference.
            //  counter so we call its destructor now.
            content->refcnt.~atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
144

145 146 147 148
            if (content->ffn)
                content->ffn (content->data, content->hint);
            free (content);
        }
Martin Sustrik's avatar
Martin Sustrik committed
149 150
    }

151 152 153 154 155 156
    //  As a safety measure, let's make the deallocated message look like
    //  an empty message.
    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
    msg_->flags = 0;
    msg_->vsm_size = 0;

Martin Sustrik's avatar
Martin Sustrik committed
157 158 159
    return 0;
}

160
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
161
{
Martin Sustrik's avatar
Martin Sustrik committed
162
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
163
    *dest_ = *src_;
Martin Sustrik's avatar
Martin Sustrik committed
164
    zmq_msg_init (src_);
Martin Sustrik's avatar
Martin Sustrik committed
165 166 167
    return 0;
}

168
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
169
{
Martin Sustrik's avatar
Martin Sustrik committed
170
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
171 172

    //  VSMs and delimiters require no special handling.
173 174
    if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
Martin Sustrik's avatar
Martin Sustrik committed
175 176 177

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
178
        zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
179
        if (src_->flags & ZMQ_MSG_SHARED)
180
            content->refcnt.add (1);
Martin Sustrik's avatar
Martin Sustrik committed
181
        else {
182
            src_->flags |= ZMQ_MSG_SHARED;
183
            content->refcnt.set (2);
Martin Sustrik's avatar
Martin Sustrik committed
184 185 186 187 188 189 190
        }
    }

    *dest_ = *src_;
    return 0;
}

191
void *zmq_msg_data (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
192
{
193
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
194
        return msg_->vsm_data;
195
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
196
        return NULL;
197 198

    return ((zmq::msg_content_t*) msg_->content)->data;
Martin Sustrik's avatar
Martin Sustrik committed
199 200
}

201
size_t zmq_msg_size (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
202
{
203
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
204
        return msg_->vsm_size;
205
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
206
        return 0;
207 208

    return ((zmq::msg_content_t*) msg_->content)->size;
Martin Sustrik's avatar
Martin Sustrik committed
209 210
}

211
void *zmq_init (int io_threads_)
Martin Sustrik's avatar
Martin Sustrik committed
212
{
213 214 215 216
    if (io_threads_ < 0) {
        errno = EINVAL;
        return NULL;
    }
Martin Sustrik's avatar
Martin Sustrik committed
217

218 219 220 221 222 223
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
224
    pgm_error_t *pgm_error = NULL;
225 226
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
227 228 229 230 231 232 233

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
234
            pgm_error_free (pgm_error);
235 236 237
            errno = EINVAL;
            return NULL;
        }
238 239

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
240 241 242 243
        zmq_assert (false);
    }
#endif

244 245 246 247 248 249 250 251 252 253 254 255 256
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

257
    //  Create 0MQ context.
258
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
259
    alloc_assert (ctx);
260
    return (void*) ctx;
Martin Sustrik's avatar
Martin Sustrik committed
261 262
}

263
int zmq_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
264
{
265 266 267 268
    if (!ctx_) {
        errno = EFAULT;
        return -1;
    }
269 270 271 272

    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

273 274 275 276 277 278
#ifdef ZMQ_HAVE_WINDOWS
    //  On Windows, uninitialise socket layer.
    rc = WSACleanup ();
    wsa_assert (rc != SOCKET_ERROR);
#endif

279 280 281 282 283 284 285 286
#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
287 288
}

289
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
290
{
291 292 293 294
    if (!ctx_) {
        errno = EFAULT;
        return NULL;
    }
295
    return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
Martin Sustrik's avatar
Martin Sustrik committed
296 297
}

Martin Sustrik's avatar
Martin Sustrik committed
298
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
299
{
300 301 302 303
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
304
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
305 306 307
    return 0;
}

308 309
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
310
{
311 312 313 314
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
315 316
    return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
        optvallen_));
Martin Sustrik's avatar
Martin Sustrik committed
317 318
}

319 320
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
321 322 323 324
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
325 326 327 328
    return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
        optvallen_));
}

329
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
330
{
331 332 333 334
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
335
    return (((zmq::socket_base_t*) s_)->bind (addr_));
336 337 338 339
}

int zmq_connect (void *s_, const char *addr_)
{
340 341 342 343
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
344
    return (((zmq::socket_base_t*) s_)->connect (addr_));
Martin Sustrik's avatar
Martin Sustrik committed
345 346
}

347
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
348
{
349 350 351 352
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
353
    return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
354 355
}

356
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
357
{
358 359 360 361
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
362
    return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
363
}
364

365 366 367 368 369
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
370 371
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
372 373
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
374 375 376 377 378 379 380 381
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
#define ZMQ_POLL_BASED_ON_SELECT
#endif

int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
382 383 384 385 386 387 388 389 390 391 392 393 394 395
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
        Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
        return 0;
#else
        return usleep (timeout_);
#endif
    }
396

397 398 399 400
    if (!items_) {
        errno = EFAULT;
        return -1;
    }
401

402 403 404 405
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

406
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
407
    alloc_assert (pollfds);
408

409
    //  Build pollset for poll () system call.
410 411
    for (int i = 0; i != nitems_; i++) {

412 413
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
414
        if (items_ [i].socket) {
415
            size_t zmq_fd_size = sizeof (zmq::fd_t);
416 417 418 419
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
                free (pollfds);
                return -1;
420
            }
421
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
422
        }
423 424
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
425
        else {
426 427 428 429
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
430
        }
431 432
    }

433
    bool first_pass = true;
434
    int nevents = 0;
435

436
    while (true) {
437

438 439 440 441 442 443 444 445 446
         //  Compute the timeout for the subsequent poll.
         int timeout;
         if (first_pass)
             timeout = 0;
         else if (timeout_ < 0)
             timeout = -1;
         else
             timeout = end - now;

447
        //  Wait for events.
448
        while (true) {
449
            int rc = poll (pollfds, nitems_, timeout);
450
            if (rc == -1 && errno == EINTR) {
451 452
                free (pollfds);
                return -1;
453
            }
454 455
            errno_assert (rc >= 0);
            break;
456
        }
457

458 459
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
460

461
            items_ [i].revents = 0;
462

463 464
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
465
            if (items_ [i].socket) {
466 467 468 469 470 471 472 473 474 475 476 477 478
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
                    free (pollfds);
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
479
            }
480 481 482 483 484 485 486 487 488 489 490 491 492
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
493
        }
494

495 496 497
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
498

499 500 501 502 503 504
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
505 506 507
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
508
            continue;
509
        }
510

511 512 513 514 515 516 517
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
            end = now + (timeout_ / 1000);
518 519 520
            if (now == end)
                break;
            first_pass = false;
521 522
            continue;
        }
523

524 525 526 527
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
528 529 530 531 532
    }

    free (pollfds);
    return nevents;

533
#elif defined ZMQ_POLL_BASED_ON_SELECT
534

535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
        Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
        return 0;
#else
        return usleep (timeout_);
#endif
    }

550 551 552 553 554 555 556 557 558 559 560 561 562
    if (!items_) {
        errno = EFAULT;
        return -1;
    }

    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

563 564 565 566 567 568 569
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

570
    zmq::fd_t maxfd = 0;
571

572
    //  Build the fd_sets for passing to select ().
573 574
    for (int i = 0; i != nitems_; i++) {

575 576
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
577
        if (items_ [i].socket) {
578 579 580 581 582
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
583 584 585 586 587
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
588
        }
589 590 591 592 593 594 595 596 597 598 599
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
600 601 602
        }
    }

603
    bool first_pass = true;
604
    int nevents = 0;
605
    fd_set inset, outset, errset;
606 607

    while (true) {
608

609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
        else if (timeout_ < 0)
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

625 626 627 628 629
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
630
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
631
#if defined ZMQ_HAVE_WINDOWS
632
            wsa_assert (rc != SOCKET_ERROR);
633
#else
634 635
            if (rc == -1 && errno == EINTR)
                return -1;
636
            errno_assert (rc >= 0);
637
#endif
638 639
            break;
        }
640

641 642
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
643

644
            items_ [i].revents = 0;
645

646 647 648
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
649 650 651 652
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
653
                    return -1;
654 655 656 657 658 659
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
660 661 662 663 664
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
665
                    items_ [i].revents |= ZMQ_POLLIN;
666 667 668 669
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
670
            }
671 672 673

            if (items_ [i].revents)
                nevents++;
674
        }
675

676 677 678
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
679

680 681 682 683 684 685
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
686 687 688
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
689
            continue;
690
        }
691

692 693 694 695 696 697 698
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
            end = now + (timeout_ / 1000);
699 700 701
            if (now == end)
                break;
            first_pass = false;
702 703
            continue;
        }
704

705 706 707 708
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
709 710 711 712 713
    }

    return nevents;

#else
714
    //  Exotic platforms that support neither poll() nor select().
715 716
    errno = ENOTSUP;
    return -1;
717 718 719
#endif
}

720 721 722 723 724 725 726
#if defined ZMQ_POLL_BASED_ON_SELECT
#undef ZMQ_POLL_BASED_ON_SELECT
#endif
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif

727 728 729 730 731
int zmq_errno ()
{
    return errno;
}

732 733
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
734 735 736 737
    if (!insocket_ || !outsocket_) {
        errno = EFAULT;
        return -1;
    }
738 739 740 741 742

    if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
          device_ != ZMQ_STREAMER) {
       errno = EINVAL;
       return -1;
743
    }
744 745 746

    return zmq::device ((zmq::socket_base_t*) insocket_,
        (zmq::socket_base_t*) outsocket_);
747
}
748 749 750 751 752 753 754

////////////////////////////////////////////////////////////////////////////////
//  0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////

void zmq_sleep (int seconds_)
{
Martin Sustrik's avatar
Martin Sustrik committed
755
#if defined ZMQ_HAVE_WINDOWS
756 757 758 759
    Sleep (seconds_ * 1000);
#else
    sleep (seconds_);
#endif
Martin Sustrik's avatar
Martin Sustrik committed
760
}
761 762 763 764

void *zmq_stopwatch_start ()
{
    uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
765
    alloc_assert (watch);
Martin Sustrik's avatar
Martin Sustrik committed
766
    *watch = zmq::clock_t::now_us ();
767 768 769 770 771
    return (void*) watch;
}

unsigned long zmq_stopwatch_stop (void *watch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
772
    uint64_t end = zmq::clock_t::now_us ();
773 774 775 776
    uint64_t start = *(uint64_t*) watch_;
    free (watch_);
    return (unsigned long) (end - start);
}