zmq.cpp 20.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
21
#include "../include/zmq_utils.h"
Martin Sustrik's avatar
Martin Sustrik committed
22

23
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
24 25 26 27
#include <errno.h>
#include <stdlib.h>
#include <new>

28 29 30
#include "forwarder.hpp"
#include "queue.hpp"
#include "streamer.hpp"
31
#include "socket_base.hpp"
32
#include "msg_content.hpp"
33 34
#include "platform.hpp"
#include "stdint.hpp"
35
#include "config.hpp"
36
#include "ctx.hpp"
37
#include "err.hpp"
38
#include "fd.hpp"
39

40 41 42
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
43 44
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
45 46
#include <poll.h>
#endif
47 48 49 50 51

#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <sys/time.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
52

53 54 55 56
#if defined ZMQ_HAVE_OPENPGM
#include <pgm/pgm.h>
#endif

57 58
void zmq_version (int *major_, int *minor_, int *patch_)
{
59 60 61
    *major_ = PACKAGE_VERSION_MAJOR;
    *minor_ = PACKAGE_VERSION_MINOR;
    *patch_ = PACKAGE_VERSION_PATCH;
62 63
}

64 65 66
const char *zmq_strerror (int errnum_)
{
    switch (errnum_) {
67 68 69 70 71
#if defined ZMQ_HAVE_WINDOWS
    case ENOTSUP:
        return "Not supported";
    case EPROTONOSUPPORT:
        return "Protocol not supported";
72 73 74 75 76 77 78 79
    case ENOBUFS:
        return "No buffer space available";
    case ENETDOWN:
        return "Network is down";
    case EADDRINUSE:
        return "Address in use";
    case EADDRNOTAVAIL:
        return "Address not available";
80 81 82 83
    case ECONNREFUSED:
        return "Connection refused";
    case EINPROGRESS:
        return "Operation in progress";
84
#endif
85 86 87 88
    case EFSM:
        return "Operation cannot be accomplished in current state";
    case ENOCOMPATPROTO:
        return "The protocol is not compatible with the socket type";
89 90
    case ETERM:
        return "Context was terminated";
91 92
    case EMTHREAD:
        return "No thread available";
93
    default:
94 95 96 97
#if defined _MSC_VER
#pragma warning (push)
#pragma warning (disable:4996)
#endif
98
        return strerror (errnum_);
99 100 101
#if defined _MSC_VER
#pragma warning (pop)
#endif
102 103 104
    }
}

105
int zmq_msg_init (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
106
{
107
    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
108
    msg_->flags = 0;
Martin Sustrik's avatar
Martin Sustrik committed
109 110 111 112
    msg_->vsm_size = 0;
    return 0;
}

113
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
114
{
Martin Sustrik's avatar
Martin Sustrik committed
115
    if (size_ <= ZMQ_MAX_VSM_SIZE) {
116
        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
117
        msg_->flags = 0;
118
        msg_->vsm_size = (uint8_t) size_;
Martin Sustrik's avatar
Martin Sustrik committed
119 120
    }
    else {
121 122
        msg_->content =
            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
Martin Sustrik's avatar
Martin Sustrik committed
123 124 125 126
        if (!msg_->content) {
            errno = ENOMEM;
            return -1;
        }
127 128
        msg_->flags = 0;
        
129 130 131 132
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        content->data = (void*) (content + 1);
        content->size = size_;
        content->ffn = NULL;
133
        content->hint = NULL;
134
        new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
135 136 137 138
    }
    return 0;
}

139
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
140
    zmq_free_fn *ffn_, void *hint_)
Martin Sustrik's avatar
Martin Sustrik committed
141
{
142
    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
Martin Sustrik's avatar
Martin Sustrik committed
143
    zmq_assert (msg_->content);
144
    msg_->flags = 0;
145 146 147 148
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
    content->data = data_;
    content->size = size_;
    content->ffn = ffn_;
149
    content->hint = hint_;
150
    new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
151 152 153
    return 0;
}

154
int zmq_msg_close (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
155
{
156 157
    //  For VSMs and delimiters there are no resources to free.
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
158
          msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
159 160
        return 0;

161
    //  If the content is not shared, or if it is shared and the reference.
Martin Sustrik's avatar
Martin Sustrik committed
162
    //  count has dropped to zero, deallocate it.
163
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
164
    if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
Martin Sustrik's avatar
Martin Sustrik committed
165

166
        //  We used "placement new" operator to initialize the reference.
Martin Sustrik's avatar
Martin Sustrik committed
167
        //  counter so we call its destructor now.
168
        content->refcnt.~atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
169

170
        if (content->ffn)
171
            content->ffn (content->data, content->hint);
172
        free (content);
Martin Sustrik's avatar
Martin Sustrik committed
173 174 175 176 177
    }

    return 0;
}

178
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
179
{
Martin Sustrik's avatar
Martin Sustrik committed
180
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
181
    *dest_ = *src_;
Martin Sustrik's avatar
Martin Sustrik committed
182
    zmq_msg_init (src_);
Martin Sustrik's avatar
Martin Sustrik committed
183 184 185
    return 0;
}

186
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
187
{
Martin Sustrik's avatar
Martin Sustrik committed
188
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
189 190

    //  VSMs and delimiters require no special handling.
191 192
    if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
Martin Sustrik's avatar
Martin Sustrik committed
193 194 195

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
196
        zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
197
        if (src_->flags & ZMQ_MSG_SHARED)
198
            content->refcnt.add (1);
Martin Sustrik's avatar
Martin Sustrik committed
199
        else {
200
            src_->flags |= ZMQ_MSG_SHARED;
201
            content->refcnt.set (2);
Martin Sustrik's avatar
Martin Sustrik committed
202 203 204 205 206 207 208
        }
    }

    *dest_ = *src_;
    return 0;
}

209
void *zmq_msg_data (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
210
{
211
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
212
        return msg_->vsm_data;
213
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
214
        return NULL;
215 216

    return ((zmq::msg_content_t*) msg_->content)->data;
Martin Sustrik's avatar
Martin Sustrik committed
217 218
}

219
size_t zmq_msg_size (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
220
{
221
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
222
        return msg_->vsm_size;
223
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
224
        return 0;
225 226

    return ((zmq::msg_content_t*) msg_->content)->size;
Martin Sustrik's avatar
Martin Sustrik committed
227 228
}

229
void *zmq_init (int io_threads_)
Martin Sustrik's avatar
Martin Sustrik committed
230
{
231 232 233 234
    if (io_threads_ < 0) {
        errno = EINVAL;
        return NULL;
    }
Martin Sustrik's avatar
Martin Sustrik committed
235

236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
#if defined ZMQ_HAVE_OPENPGM
    //  Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
    //  let's fail if it was initialised beforehand.
    zmq_assert (!pgm_supported ());

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
    GError *pgm_error = NULL;
    int rc = pgm_init (&pgm_error);
    if (rc != TRUE) {
        if (pgm_error->domain == PGM_IF_ERROR && (
              pgm_error->code == PGM_IF_ERROR_INVAL ||
              pgm_error->code == PGM_IF_ERROR_XDEV ||
              pgm_error->code == PGM_IF_ERROR_NODEV ||
              pgm_error->code == PGM_IF_ERROR_NOTUNIQ ||
              pgm_error->code == PGM_IF_ERROR_ADDRFAMILY ||
              pgm_error->code == PGM_IF_ERROR_FAMILY ||
              pgm_error->code == PGM_IF_ERROR_NODATA ||
              pgm_error->code == PGM_IF_ERROR_NONAME ||
              pgm_error->code == PGM_IF_ERROR_SERVICE)) {
            g_error_free (pgm_error);
            errno = EINVAL;
            return NULL;
        }
        zmq_assert (false);
    }
#endif

    //  Create 0MQ context.
267 268 269
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
    zmq_assert (ctx);
    return (void*) ctx;
Martin Sustrik's avatar
Martin Sustrik committed
270 271
}

272
int zmq_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
273
{
274 275 276 277
    if (!ctx_) {
        errno = EFAULT;
        return -1;
    }
278 279 280 281

    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

282 283 284 285 286 287 288 289
#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
290 291
}

292
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
293
{
294 295 296 297
    if (!ctx_) {
        errno = EFAULT;
        return NULL;
    }
298
    return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
Martin Sustrik's avatar
Martin Sustrik committed
299 300
}

Martin Sustrik's avatar
Martin Sustrik committed
301
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
302
{
303 304 305 306
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
307
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
308 309 310
    return 0;
}

311 312
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
313
{
314 315 316 317
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
318 319
    return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
        optvallen_));
Martin Sustrik's avatar
Martin Sustrik committed
320 321
}

322 323
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
324 325 326 327
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
328 329 330 331
    return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
        optvallen_));
}

332
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
333
{
334 335 336 337
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
338
    return (((zmq::socket_base_t*) s_)->bind (addr_));
339 340 341 342
}

int zmq_connect (void *s_, const char *addr_)
{
343 344 345 346
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
347
    return (((zmq::socket_base_t*) s_)->connect (addr_));
Martin Sustrik's avatar
Martin Sustrik committed
348 349
}

350
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
351
{
352 353 354 355
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
356
    return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
357 358
}

359
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
360
{
361 362 363 364
    if (!s_) {
        errno = EFAULT;
        return -1;
    }
365
    return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
366
}
367

368
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
369
{
370 371 372
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
373 374
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
375

376 377 378 379
    if (!items_) {
        errno = EFAULT;
        return -1;
    }
380

381 382 383
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
    zmq_assert (pollfds);

384
    //  Build pollset for poll () system call.
385 386
    for (int i = 0; i != nitems_; i++) {

387 388
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
389
        if (items_ [i].socket) {
390
            size_t zmq_fd_size = sizeof (zmq::fd_t);
391 392 393 394
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
                free (pollfds);
                return -1;
395
            }
396
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
397
        }
398 399
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
400
        else {
401 402 403 404
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
405
        }
406 407
    }

408
    bool first_pass = true;
409
    int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
410
    int nevents = 0;
411

412
    while (true) {
413

414
        //  Wait for events.
415
        while (true) {
416
            int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
417
            if (rc == -1 && errno == EINTR) {
418 419
                free (pollfds);
                return -1;
420
            }
421 422
            errno_assert (rc >= 0);
            break;
423
        }
424

425 426
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
427

428
            items_ [i].revents = 0;
429

430 431
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
432
            if (items_ [i].socket) {
433 434 435 436 437 438 439 440 441 442 443 444 445
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
                    free (pollfds);
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
446
            }
447 448 449 450 451 452 453 454 455 456 457 458 459
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
460
        }
461

462 463 464 465 466 467 468
        //  If there are no events from the first pass (the one with no
        //  timout), do at least the second pass so that we wait.
        if (first_pass && nevents == 0 && timeout_ != 0) {
            first_pass = false;
            continue;
        }

469 470 471 472 473
        //  If timeout is set to infinite and we have to events to return
        //  we can restart the polling.
        if (timeout == -1 && nevents == 0)
            continue;

474 475 476
        //  TODO: if nevents is zero recompute timeout and loop
        //  if it is not yet reached.

477
        break;
478 479 480 481 482
    }

    free (pollfds);
    return nevents;

483 484 485 486 487 488 489 490 491
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS

    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

492
    zmq::fd_t maxfd = 0;
493

494 495 496 497
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

498
    //  Build the fd_sets for passing to select ().
499 500
    for (int i = 0; i != nitems_; i++) {

501 502
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
503
        if (items_ [i].socket) {
504 505 506 507 508
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
509 510 511 512 513
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
514
        }
515 516 517 518 519 520 521 522 523 524 525
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
526 527 528
        }
    }

529 530
    bool first_pass = true;
    timeval zero_timeout = {0, 0};
531
    timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
532
    int nevents = 0;
533
    fd_set inset, outset, errset;
534 535

    while (true) {
536 537 538 539 540 541 542

        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
            int rc = select (maxfd, &inset, &outset, &errset,
543
                first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
544
#if defined ZMQ_HAVE_WINDOWS
545
            wsa_assert (rc != SOCKET_ERROR);
546
#else
547 548
            if (rc == -1 && errno == EINTR)
                return -1;
549
            errno_assert (rc >= 0);
550
#endif
551 552
            break;
        }
553

554 555
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
556

557
            items_ [i].revents = 0;
558

559 560 561 562 563 564
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
                size_t zmq_fd_size = sizeof (zmq::fd_t);
                zmq::fd_t notify_fd;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
565
                      &zmq_fd_size) == -1)
566
                    return -1;
567 568 569 570
                if (FD_ISSET (notify_fd, &inset)) {
                    size_t zmq_events_size = sizeof (uint32_t);
                    uint32_t zmq_events;
                    if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
571
                          &zmq_events_size) == -1)
572 573
                        return -1;
                    if ((items_ [i].events & ZMQ_POLLOUT) &&
574
                          (zmq_events & ZMQ_POLLOUT))
575 576
                        items_ [i].revents |= ZMQ_POLLOUT;
                    if ((items_ [i].events & ZMQ_POLLIN) &&
577
                          (zmq_events & ZMQ_POLLIN))
578 579 580 581 582 583 584
                        items_ [i].revents |= ZMQ_POLLIN;
                }
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
585
                    items_ [i].revents |= ZMQ_POLLIN;
586 587 588 589
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
590
            }
591 592 593

            if (items_ [i].revents)
                nevents++;
594
        }
595

596 597 598 599 600 601 602
        //  If there are no events from the first pass (the one with no
        //  timout), do at least the second pass so that we wait.
        if (first_pass && nevents == 0 && timeout_ != 0) {
            first_pass = false;
            continue;
        }

603 604 605 606 607
        //  If timeout is set to infinite and we have to events to return
        //  we can restart the polling.
        if (timeout_ < 0 && nevents == 0)
            continue;

608 609 610
        //  TODO: if nevents is zero recompute timeout and loop
        //  if it is not yet reached.

611
        break;
612 613 614 615 616
    }

    return nevents;

#else
617
    //  Exotic platforms that support neither poll() nor select().
618 619
    errno = ENOTSUP;
    return -1;
620 621 622
#endif
}

623 624 625 626 627
int zmq_errno ()
{
    return errno;
}

628 629
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
630 631 632 633
    if (!insocket_ || !outsocket_) {
        errno = EFAULT;
        return -1;
    }
634 635 636 637 638 639 640 641 642 643 644 645 646 647
    switch (device_) {
    case ZMQ_FORWARDER:
        return zmq::forwarder ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    case ZMQ_QUEUE:
        return zmq::queue ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    case ZMQ_STREAMER:
        return zmq::streamer ((zmq::socket_base_t*) insocket_,
            (zmq::socket_base_t*) outsocket_);
    default:
        return EINVAL;
    }
}
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710

////////////////////////////////////////////////////////////////////////////////
//  0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////

#if defined ZMQ_HAVE_WINDOWS

static uint64_t now ()
{    
    //  Get the high resolution counter's accuracy.
    LARGE_INTEGER ticksPerSecond;
    QueryPerformanceFrequency (&ticksPerSecond);

    //  What time is it?
    LARGE_INTEGER tick;
    QueryPerformanceCounter (&tick);

    //  Convert the tick number into the number of seconds
    //  since the system was started.
    double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);     
    return (uint64_t) (tick.QuadPart / ticks_div);
}

void zmq_sleep (int seconds_)
{
    Sleep (seconds_ * 1000);
}

#else

static uint64_t now ()
{
    struct timeval tv;
    int rc;

    rc = gettimeofday (&tv, NULL);
    assert (rc == 0);
    return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
}

void zmq_sleep (int seconds_)
{
    sleep (seconds_);
}

#endif

void *zmq_stopwatch_start ()
{
    uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
    assert (watch);
    *watch = now ();
    return (void*) watch;
}

unsigned long zmq_stopwatch_stop (void *watch_)
{
    uint64_t end = now ();
    uint64_t start = *(uint64_t*) watch_;
    free (watch_);
    return (unsigned long) (end - start);
}