zmq.cpp 22.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21 22 23 24 25 26 27 28 29 30 31 32 33
#include "platform.hpp"

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
#include <poll.h>
#endif

34
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "../include/zmq_utils.h"
Martin Sustrik's avatar
Martin Sustrik committed
36

37
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
38 39 40 41
#include <errno.h>
#include <stdlib.h>
#include <new>

42 43 44
#include "forwarder.hpp"
#include "queue.hpp"
#include "streamer.hpp"
45
#include "socket_base.hpp"
46
#include "app_thread.hpp"
47
#include "msg_content.hpp"
48
#include "stdint.hpp"
49
#include "config.hpp"
50
#include "ctx.hpp"
51
#include "err.hpp"
52
#include "fd.hpp"
53

54 55 56 57
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <sys/time.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
58

59 60 61 62
#if defined ZMQ_HAVE_OPENPGM
#include <pgm/pgm.h>
#endif

63 64
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
65 66 67
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
68 69
}

70 71 72
const char *zmq_strerror (int errnum_)
{
    switch (errnum_) {
73 74 75 76 77
#if defined ZMQ_HAVE_WINDOWS
    case ENOTSUP:
        return "Not supported";
    case EPROTONOSUPPORT:
        return "Protocol not supported";
78 79 80 81 82 83 84 85
    case ENOBUFS:
        return "No buffer space available";
    case ENETDOWN:
        return "Network is down";
    case EADDRINUSE:
        return "Address in use";
    case EADDRNOTAVAIL:
        return "Address not available";
86 87 88 89
    case ECONNREFUSED:
        return "Connection refused";
    case EINPROGRESS:
        return "Operation in progress";
90
#endif
91 92 93 94 95 96
    case EMTHREAD:
        return "Number of preallocated application threads exceeded";
    case EFSM:
        return "Operation cannot be accomplished in current state";
    case ENOCOMPATPROTO:
        return "The protocol is not compatible with the socket type";
97 98
    case ETERM:
        return "Context was terminated";
99
    default:
100 101 102 103
#if defined _MSC_VER
#pragma warning (push)
#pragma warning (disable:4996)
#endif
104
        return strerror (errnum_);
105 106 107
#if defined _MSC_VER
#pragma warning (pop)
#endif
108 109 110
    }
}

111
int zmq_msg_init (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
112
{
113
    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
114
    msg_->flags = 0;
Martin Sustrik's avatar
Martin Sustrik committed
115 116 117 118
    msg_->vsm_size = 0;
    return 0;
}

119
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
120
{
Martin Sustrik's avatar
Martin Sustrik committed
121
    if (size_ <= ZMQ_MAX_VSM_SIZE) {
122
        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
123
        msg_->flags = 0;
124
        msg_->vsm_size = (uint8_t) size_;
Martin Sustrik's avatar
Martin Sustrik committed
125 126
    }
    else {
127 128
        msg_->content =
            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
Martin Sustrik's avatar
Martin Sustrik committed
129 130 131 132
        if (!msg_->content) {
            errno = ENOMEM;
            return -1;
        }
133 134
        msg_->flags = 0;
        
135 136 137 138
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        content->data = (void*) (content + 1);
        content->size = size_;
        content->ffn = NULL;
139
        content->hint = NULL;
140
        new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
141 142 143 144
    }
    return 0;
}

145
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
146
    zmq_free_fn *ffn_, void *hint_)
Martin Sustrik's avatar
Martin Sustrik committed
147
{
148
    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
Martin Sustrik's avatar
Martin Sustrik committed
149
    zmq_assert (msg_->content);
150
    msg_->flags = 0;
151 152 153 154
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
    content->data = data_;
    content->size = size_;
    content->ffn = ffn_;
155
    content->hint = hint_;
156
    new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
157 158 159
    return 0;
}

160
int zmq_msg_close (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
161
{
162 163
    //  For VSMs and delimiters there are no resources to free.
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
164
          msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
165 166
        return 0;

167
    //  If the content is not shared, or if it is shared and the reference.
Martin Sustrik's avatar
Martin Sustrik committed
168
    //  count has dropped to zero, deallocate it.
169
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
170
    if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
Martin Sustrik's avatar
Martin Sustrik committed
171

172
        //  We used "placement new" operator to initialize the reference.
Martin Sustrik's avatar
Martin Sustrik committed
173
        //  counter so we call its destructor now.
174
        content->refcnt.~atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
175

176
        if (content->ffn)
177
            content->ffn (content->data, content->hint);
178
        free (content);
Martin Sustrik's avatar
Martin Sustrik committed
179 180 181 182 183
    }

    return 0;
}

184
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
185
{
Martin Sustrik's avatar
Martin Sustrik committed
186
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
187
    *dest_ = *src_;
Martin Sustrik's avatar
Martin Sustrik committed
188
    zmq_msg_init (src_);
Martin Sustrik's avatar
Martin Sustrik committed
189 190 191
    return 0;
}

192
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
193
{
Martin Sustrik's avatar
Martin Sustrik committed
194
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
195 196

    //  VSMs and delimiters require no special handling.
197 198
    if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
Martin Sustrik's avatar
Martin Sustrik committed
199 200 201

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
202
        zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
203
        if (src_->flags & ZMQ_MSG_SHARED)
204
            content->refcnt.add (1);
Martin Sustrik's avatar
Martin Sustrik committed
205
        else {
206
            src_->flags |= ZMQ_MSG_SHARED;
207
            content->refcnt.set (2);
Martin Sustrik's avatar
Martin Sustrik committed
208 209 210 211 212 213 214
        }
    }

    *dest_ = *src_;
    return 0;
}

215
void *zmq_msg_data (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
216
{
217
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
218
        return msg_->vsm_data;
219
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
220
        return NULL;
221 222

    return ((zmq::msg_content_t*) msg_->content)->data;
Martin Sustrik's avatar
Martin Sustrik committed
223 224
}

225
size_t zmq_msg_size (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
226
{
227
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
228
        return msg_->vsm_size;
229
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
230
        return 0;
231 232

    return ((zmq::msg_content_t*) msg_->content)->size;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
}

235
void *zmq_init (int io_threads_)
Martin Sustrik's avatar
Martin Sustrik committed
236
{
237 238 239 240
    if (io_threads_ < 0) {
        errno = EINVAL;
        return NULL;
    }
Martin Sustrik's avatar
Martin Sustrik committed
241

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
#if defined ZMQ_HAVE_OPENPGM
    //  Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
    //  let's fail if it was initialised beforehand.
    zmq_assert (!pgm_supported ());

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
    GError *pgm_error = NULL;
    int rc = pgm_init (&pgm_error);
    if (rc != TRUE) {
        if (pgm_error->domain == PGM_IF_ERROR && (
              pgm_error->code == PGM_IF_ERROR_INVAL ||
              pgm_error->code == PGM_IF_ERROR_XDEV ||
              pgm_error->code == PGM_IF_ERROR_NODEV ||
              pgm_error->code == PGM_IF_ERROR_NOTUNIQ ||
              pgm_error->code == PGM_IF_ERROR_ADDRFAMILY ||
              pgm_error->code == PGM_IF_ERROR_FAMILY ||
              pgm_error->code == PGM_IF_ERROR_NODATA ||
              pgm_error->code == PGM_IF_ERROR_NONAME ||
              pgm_error->code == PGM_IF_ERROR_SERVICE)) {
            g_error_free (pgm_error);
            errno = EINVAL;
            return NULL;
        }
        zmq_assert (false);
    }
#endif

    //  Create 0MQ context.
273 274 275
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
    zmq_assert (ctx);
    return (void*) ctx;
Martin Sustrik's avatar
Martin Sustrik committed
276 277
}

278
int zmq_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
279
{
280 281 282 283
    if (!ctx_) {
        errno = EFAULT;
        return -1;
    }
284 285 286 287

    int rc = ((zmq::ctx_t*) ctx_)->term ();
    int en = errno;

288 289 290 291 292 293 294 295
#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
296 297
}

298
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
299
{
300 301 302 303
    if (!ctx_) {
        errno = EFAULT;
        return NULL;
    }
304
    return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
Martin Sustrik's avatar
Martin Sustrik committed
305 306
}

Martin Sustrik's avatar
Martin Sustrik committed
307
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
308
{
309 310 311 312
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
313
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
314 315 316
    return 0;
}

317 318
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
319
{
320 321 322 323
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
324 325
    return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
        optvallen_));
Martin Sustrik's avatar
Martin Sustrik committed
326 327
}

328 329
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
330 331 332 333
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
334 335 336 337
    return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
        optvallen_));
}

338
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
339
{
340 341 342 343
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
344
    return (((zmq::socket_base_t*) s_)->bind (addr_));
345 346 347 348
}

int zmq_connect (void *s_, const char *addr_)
{
349 350 351 352
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
353
    return (((zmq::socket_base_t*) s_)->connect (addr_));
Martin Sustrik's avatar
Martin Sustrik committed
354 355
}

356
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
357
{
358 359 360 361
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
362
    return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
363 364
}

365
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
366
{
367 368 369 370
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
371
    return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
372
}
373

374
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
375
{
376 377 378
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
379 380
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
381

382 383 384 385
    if (!items_) {
        errno = EFAULT;
        return -1;
    }
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
    zmq_assert (pollfds);
    int npollfds = 0;
    int nsockets = 0;

    zmq::app_thread_t *app_thread = NULL;

    for (int i = 0; i != nitems_; i++) {

        //  0MQ sockets.
        if (items_ [i].socket) {

            //  Get the app_thread the socket is living in. If there are two
            //  sockets in the same pollset with different app threads, fail.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            if (app_thread) {
                if (app_thread != s->get_thread ()) {
                    free (pollfds);
                    errno = EFAULT;
                    return -1;
                }
            }
            else
                app_thread = s->get_thread ();

            nsockets++;
            continue;
        }

        //  Raw file descriptors.
        pollfds [npollfds].fd = items_ [i].fd;
        pollfds [npollfds].events =
            (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
            (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
        npollfds++;
    }

    //  If there's at least one 0MQ socket in the pollset we have to poll
    //  for 0MQ commands. If ZMQ_POLL was not set, fail.
    if (nsockets) {
        pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
        if (pollfds [npollfds].fd == zmq::retired_fd) {
            free (pollfds);
            errno = ENOTSUP;
            return -1;
        }
        pollfds [npollfds].events = POLLIN;
        npollfds++;
    }

436 437 438 439
    //  First iteration just check for events, don't block. Waiting would
    //  prevent exiting on any events that may already been signaled on
    //  0MQ sockets.
    int rc = poll (pollfds, npollfds, 0);
440
    if (rc == -1 && errno == EINTR && timeout_ >= 0) {
441 442 443
        free (pollfds);
        return 0;
    }
444
    errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
445

446
    int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
447
    int nevents = 0;
448

449
    while (true) {
450 451 452

        //  Process 0MQ commands if needed.
        if (nsockets && pollfds [npollfds -1].revents & POLLIN)
453 454 455 456 457
            if (!app_thread->process_commands (false, false)) {
                free (pollfds);
                errno = ETERM;
                return -1;
            }
458 459 460 461 462 463 464 465

        //  Check for the events.
        int pollfd_pos = 0;
        for (int i = 0; i != nitems_; i++) {

            //  If the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            if (!items_ [i].socket) {
466 467 468 469 470 471 472 473
                items_ [i].revents = 0;
                if (pollfds [pollfd_pos].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [pollfd_pos].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;

474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
                if (items_ [i].revents)
                    nevents++;
                pollfd_pos++;
                continue;
            }

            //  The poll item is a 0MQ socket.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            items_ [i].revents = 0;
            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
                items_ [i].revents |= ZMQ_POLLOUT;
            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
                items_ [i].revents |= ZMQ_POLLIN;
            if (items_ [i].revents)
                nevents++;
        }
490 491 492

        //  If there's at least one event, or if we are asked not to block,
        //  return immediately.
493
        if (nevents || !timeout_)
494 495
            break;

496 497 498 499 500 501 502 503 504 505 506 507
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            rc = poll (pollfds, npollfds, timeout);
            if (rc == -1 && errno == EINTR) {
                if (timeout_ < 0)
                    continue;
                else {
                    rc = 0;
                    break;
                }
            }
            errno_assert (rc >= 0);
508
            break;
509 510
        }
        
511 512 513 514 515 516 517 518 519
        //  If timeout was hit with no events signaled, return zero.
        if (rc == 0)
            break;

        //  If timeout was already applied, we don't want to poll anymore.
        //  Setting timeout to zero will cause termination of the function
        //  once the events we've got are processed.
        if (timeout > 0)
            timeout = 0;
520 521 522 523 524
    }

    free (pollfds);
    return nevents;

525 526 527 528 529 530 531 532 533 534 535 536 537 538
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS

    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

    zmq::app_thread_t *app_thread = NULL;
    int nsockets = 0;
    zmq::fd_t maxfd = zmq::retired_fd;
    zmq::fd_t notify_fd = zmq::retired_fd;

539 540 541 542
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
    for (int i = 0; i != nitems_; i++) {

        //  0MQ sockets.
        if (items_ [i].socket) {

            //  Get the app_thread the socket is living in. If there are two
            //  sockets in the same pollset with different app threads, fail.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            if (app_thread) {
                if (app_thread != s->get_thread ()) {
                    errno = EFAULT;
                    return -1;
                }
            }
            else
                app_thread = s->get_thread ();

            nsockets++;
            continue;
        }

        //  Raw file descriptors.
        if (items_ [i].events & ZMQ_POLLIN)
            FD_SET (items_ [i].fd, &pollset_in);
        if (items_ [i].events & ZMQ_POLLOUT)
            FD_SET (items_ [i].fd, &pollset_out);
569 570
        if (items_ [i].events & ZMQ_POLLERR)
            FD_SET (items_ [i].fd, &pollset_err);
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587
        if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
            maxfd = items_ [i].fd;
    }

    //  If there's at least one 0MQ socket in the pollset we have to poll
    //  for 0MQ commands. If ZMQ_POLL was not set, fail.
    if (nsockets) {
        notify_fd = app_thread->get_signaler ()->get_fd ();
        if (notify_fd == zmq::retired_fd) {
            errno = ENOTSUP;
            return -1;
        }
        FD_SET (notify_fd, &pollset_in);
        if (maxfd == zmq::retired_fd || maxfd < notify_fd)
            maxfd = notify_fd;
    }

588
    bool block = (timeout_ < 0);
589 590
    timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
    timeval zero_timeout = {0, 0};
591 592
    int nevents = 0;

593 594 595
    //  First iteration just check for events, don't block. Waiting would
    //  prevent exiting on any events that may already been signaled on
    //  0MQ sockets.
596 597 598 599 600
    fd_set inset, outset, errset;
    memcpy (&inset, &pollset_in, sizeof (fd_set));
    memcpy (&outset, &pollset_out, sizeof (fd_set));
    memcpy (&errset, &pollset_err, sizeof (fd_set));
    int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout);
601
#if defined ZMQ_HAVE_WINDOWS
602
    wsa_assert (rc != SOCKET_ERROR);
603
#else
604 605 606
    if (rc == -1 && errno == EINTR && timeout_ >= 0)
        return 0;
    errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
607
#endif
608

609
    while (true) {
610 611

        //  Process 0MQ commands if needed.
612
        if (nsockets && FD_ISSET (notify_fd, &inset))
613 614 615 616
            if (!app_thread->process_commands (false, false)) {
                errno = ETERM;
                return -1;
            }
617 618 619 620 621 622 623

        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {

            //  If the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            if (!items_ [i].socket) {
624 625 626 627 628 629 630
                items_ [i].revents = 0;
                if (FD_ISSET (items_ [i].fd, &inset))
                    items_ [i].revents |= ZMQ_POLLIN;
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
                if (items_ [i].revents)
                    nevents++;
                continue;
            }

            //  The poll item is a 0MQ socket.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            items_ [i].revents = 0;
            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
                items_ [i].revents |= ZMQ_POLLOUT;
            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
                items_ [i].revents |= ZMQ_POLLIN;
            if (items_ [i].revents)
                nevents++;
        }
646 647 648 649 650 651

        //  If there's at least one event, or if we are asked not to block,
        //  return immediately.
        if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0))
            break;

652 653 654 655 656
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
657
            rc = select (maxfd, &inset, &outset, &errset,
658
                block ? NULL : &timeout);
659
#if defined ZMQ_HAVE_WINDOWS
660
            wsa_assert (rc != SOCKET_ERROR);
661
#else
662 663 664 665 666 667 668 669 670
            if (rc == -1 && errno == EINTR) {
                if (timeout_ < 0)
                    continue;
                else {
                    rc = 0;
                    break;
                }
            }
            errno_assert (rc >= 0);
671
#endif
672 673
            break;
        }
674 675 676 677 678 679 680 681 682 683
        
        //  If timeout was hit with no events signaled, return zero.
        if (rc == 0)
            break;

        //  If timeout was already applied, we don't want to poll anymore.
        //  Setting timeout to zero will cause termination of the function
        //  once the events we've got are processed.
        if (!block)
            timeout = zero_timeout;
684 685 686 687 688 689 690
    }

    return nevents;

#else
    errno = ENOTSUP;
    return -1;
691 692 693
#endif
}

694 695 696 697 698
int zmq_errno ()
{
    return errno;
}

699 700
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
701 702 703 704
    if (!insocket_ || !outsocket_) {
        errno = EFAULT;
        return -1;
    }
705 706 707 708 709 710 711 712 713 714 715 716 717 718
    switch (device_) {
    case ZMQ_FORWARDER:
        return zmq::forwarder ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    case ZMQ_QUEUE:
        return zmq::queue ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    case ZMQ_STREAMER:
        return zmq::streamer ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    default:
        return EINVAL;
    }
}
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781

////////////////////////////////////////////////////////////////////////////////
//  0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////

#if defined ZMQ_HAVE_WINDOWS

static uint64_t now ()
{    
    //  Get the high resolution counter's accuracy.
    LARGE_INTEGER ticksPerSecond;
    QueryPerformanceFrequency (&ticksPerSecond);

    //  What time is it?
    LARGE_INTEGER tick;
    QueryPerformanceCounter (&tick);

    //  Convert the tick number into the number of seconds
    //  since the system was started.
    double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);     
    return (uint64_t) (tick.QuadPart / ticks_div);
}

void zmq_sleep (int seconds_)
{
    Sleep (seconds_ * 1000);
}

#else

static uint64_t now ()
{
    struct timeval tv;
    int rc;

    rc = gettimeofday (&tv, NULL);
    assert (rc == 0);
    return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
}

void zmq_sleep (int seconds_)
{
    sleep (seconds_);
}

#endif

void *zmq_stopwatch_start ()
{
    uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
    assert (watch);
    *watch = now ();
    return (void*) watch;
}

unsigned long zmq_stopwatch_stop (void *watch_)
{
    uint64_t end = now ();
    uint64_t start = *(uint64_t*) watch_;
    free (watch_);
    return (unsigned long) (end - start);
}