zmq.cpp 20.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
21

22
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
23 24 25 26
#include <errno.h>
#include <stdlib.h>
#include <new>

27
#include "socket_base.hpp"
28
#include "app_thread.hpp"
29
#include "dispatcher.hpp"
30
#include "msg_content.hpp"
31 32
#include "platform.hpp"
#include "stdint.hpp"
33
#include "config.hpp"
34
#include "err.hpp"
35
#include "fd.hpp"
36

37 38 39
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
40 41
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
42 43
#include <poll.h>
#endif
44 45 46 47 48

#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <sys/time.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
49

50 51 52 53
#if defined ZMQ_HAVE_OPENPGM
#include <pgm/pgm.h>
#endif

54 55
void zmq_version (int *major_, int *minor_, int *patch_)
{
56 57 58
    *major_ = PACKAGE_VERSION_MAJOR;
    *minor_ = PACKAGE_VERSION_MINOR;
    *patch_ = PACKAGE_VERSION_PATCH;
59 60
}

61 62 63
const char *zmq_strerror (int errnum_)
{
    switch (errnum_) {
64 65 66 67 68
#if defined ZMQ_HAVE_WINDOWS
    case ENOTSUP:
        return "Not supported";
    case EPROTONOSUPPORT:
        return "Protocol not supported";
69 70 71 72 73 74 75 76
    case ENOBUFS:
        return "No buffer space available";
    case ENETDOWN:
        return "Network is down";
    case EADDRINUSE:
        return "Address in use";
    case EADDRNOTAVAIL:
        return "Address not available";
77
#endif
78 79 80 81 82 83 84
    case EMTHREAD:
        return "Number of preallocated application threads exceeded";
    case EFSM:
        return "Operation cannot be accomplished in current state";
    case ENOCOMPATPROTO:
        return "The protocol is not compatible with the socket type";
    default:
85 86 87 88
#if defined _MSC_VER
#pragma warning (push)
#pragma warning (disable:4996)
#endif
89
        return strerror (errnum_);
90 91 92
#if defined _MSC_VER
#pragma warning (pop)
#endif
93 94 95
    }
}

96
int zmq_msg_init (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
97
{
98
    msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
99
    msg_->flags = 0;
Martin Sustrik's avatar
Martin Sustrik committed
100 101 102 103
    msg_->vsm_size = 0;
    return 0;
}

104
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
105
{
Martin Sustrik's avatar
Martin Sustrik committed
106
    if (size_ <= ZMQ_MAX_VSM_SIZE) {
107
        msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
108
        msg_->flags = 0;
109
        msg_->vsm_size = (uint8_t) size_;
Martin Sustrik's avatar
Martin Sustrik committed
110 111
    }
    else {
112 113
        msg_->content =
            (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
Martin Sustrik's avatar
Martin Sustrik committed
114 115 116 117
        if (!msg_->content) {
            errno = ENOMEM;
            return -1;
        }
118 119
        msg_->flags = 0;
        
120 121 122 123
        zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
        content->data = (void*) (content + 1);
        content->size = size_;
        content->ffn = NULL;
124
        content->hint = NULL;
125
        new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
126 127 128 129
    }
    return 0;
}

130
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
131
    zmq_free_fn *ffn_, void *hint_)
Martin Sustrik's avatar
Martin Sustrik committed
132
{
133
    msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
Martin Sustrik's avatar
Martin Sustrik committed
134
    zmq_assert (msg_->content);
135
    msg_->flags = 0;
136 137 138 139
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
    content->data = data_;
    content->size = size_;
    content->ffn = ffn_;
140
    content->hint = hint_;
141
    new (&content->refcnt) zmq::atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
142 143 144
    return 0;
}

145
int zmq_msg_close (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
146
{
147 148
    //  For VSMs and delimiters there are no resources to free.
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
149
          msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
150 151
        return 0;

152
    //  If the content is not shared, or if it is shared and the reference.
Martin Sustrik's avatar
Martin Sustrik committed
153
    //  count has dropped to zero, deallocate it.
154
    zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
155
    if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
Martin Sustrik's avatar
Martin Sustrik committed
156

157
        //  We used "placement new" operator to initialize the reference.
Martin Sustrik's avatar
Martin Sustrik committed
158
        //  counter so we call its destructor now.
159
        content->refcnt.~atomic_counter_t ();
Martin Sustrik's avatar
Martin Sustrik committed
160

161
        if (content->ffn)
162
            content->ffn (content->data, content->hint);
163
        free (content);
Martin Sustrik's avatar
Martin Sustrik committed
164 165 166 167 168
    }

    return 0;
}

169
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
170
{
Martin Sustrik's avatar
Martin Sustrik committed
171
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
172
    *dest_ = *src_;
Martin Sustrik's avatar
Martin Sustrik committed
173
    zmq_msg_init (src_);
Martin Sustrik's avatar
Martin Sustrik committed
174 175 176
    return 0;
}

177
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
Martin Sustrik's avatar
Martin Sustrik committed
178
{
Martin Sustrik's avatar
Martin Sustrik committed
179
    zmq_msg_close (dest_);
Martin Sustrik's avatar
Martin Sustrik committed
180 181

    //  VSMs and delimiters require no special handling.
182 183
    if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
          src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
Martin Sustrik's avatar
Martin Sustrik committed
184 185 186

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
187
        zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
188
        if (src_->flags & ZMQ_MSG_SHARED)
189
            content->refcnt.add (1);
Martin Sustrik's avatar
Martin Sustrik committed
190
        else {
191
            src_->flags |= ZMQ_MSG_SHARED;
192
            content->refcnt.set (2);
Martin Sustrik's avatar
Martin Sustrik committed
193 194 195 196 197 198 199
        }
    }

    *dest_ = *src_;
    return 0;
}

200
void *zmq_msg_data (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
201
{
202
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
203
        return msg_->vsm_data;
204
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
205
        return NULL;
206 207

    return ((zmq::msg_content_t*) msg_->content)->data;
Martin Sustrik's avatar
Martin Sustrik committed
208 209
}

210
size_t zmq_msg_size (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
211
{
212
    if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
Martin Sustrik's avatar
Martin Sustrik committed
213
        return msg_->vsm_size;
214
    if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
Martin Sustrik's avatar
Martin Sustrik committed
215
        return 0;
216 217

    return ((zmq::msg_content_t*) msg_->content)->size;
Martin Sustrik's avatar
Martin Sustrik committed
218 219
}

220
void *zmq_init (int app_threads_, int io_threads_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
221
{
222 223 224 225
    //  There should be at least a single application thread managed
    //  by the dispatcher. There's no need for I/O threads if 0MQ is used
    //  only for inproc messaging
    if (app_threads_ < 1 || io_threads_ < 0 ||
226
          app_threads_ > 63 || io_threads_ > 63) {
Martin Sustrik's avatar
Martin Sustrik committed
227 228 229 230
        errno = EINVAL;
        return NULL;
    }

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
#if defined ZMQ_HAVE_OPENPGM
    //  Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
    //  let's fail if it was initialised beforehand.
    zmq_assert (!pgm_supported ());

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
    GError *pgm_error = NULL;
    int rc = pgm_init (&pgm_error);
    if (rc != TRUE) {
        if (pgm_error->domain == PGM_IF_ERROR && (
              pgm_error->code == PGM_IF_ERROR_INVAL ||
              pgm_error->code == PGM_IF_ERROR_XDEV ||
              pgm_error->code == PGM_IF_ERROR_NODEV ||
              pgm_error->code == PGM_IF_ERROR_NOTUNIQ ||
              pgm_error->code == PGM_IF_ERROR_ADDRFAMILY ||
              pgm_error->code == PGM_IF_ERROR_FAMILY ||
              pgm_error->code == PGM_IF_ERROR_NODATA ||
              pgm_error->code == PGM_IF_ERROR_NONAME ||
              pgm_error->code == PGM_IF_ERROR_SERVICE)) {
            g_error_free (pgm_error);
            errno = EINVAL;
            return NULL;
        }
        zmq_assert (false);
    }
#endif

    //  Create 0MQ context.
262 263
    zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
        app_threads_, io_threads_, flags_);
264 265
    zmq_assert (dispatcher);
    return (void*) dispatcher;
Martin Sustrik's avatar
Martin Sustrik committed
266 267
}

268
int zmq_term (void *dispatcher_)
Martin Sustrik's avatar
Martin Sustrik committed
269
{
270 271 272 273 274 275 276 277 278 279 280
    int rc = ((zmq::dispatcher_t*) dispatcher_)->term ();
    int en = errno;

#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
281 282
}

283
void *zmq_socket (void *dispatcher_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
284
{
285
    return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_));
Martin Sustrik's avatar
Martin Sustrik committed
286 287
}

Martin Sustrik's avatar
Martin Sustrik committed
288
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
289
{
290
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
291 292 293
    return 0;
}

294 295
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
296
{
297 298
    return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
        optvallen_));
Martin Sustrik's avatar
Martin Sustrik committed
299 300
}

301
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
302
{
303
    return (((zmq::socket_base_t*) s_)->bind (addr_));
304 305 306 307
}

int zmq_connect (void *s_, const char *addr_)
{
308
    return (((zmq::socket_base_t*) s_)->connect (addr_));
Martin Sustrik's avatar
Martin Sustrik committed
309 310
}

311
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
312
{
313
    return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
314 315
}

316
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
Martin Sustrik's avatar
Martin Sustrik committed
317
{
318
    return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
Martin Sustrik's avatar
Martin Sustrik committed
319
}
320

321
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
322
{
323 324 325
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
Martin Lucina's avatar
Martin Lucina committed
326 327
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378

    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
    zmq_assert (pollfds);
    int npollfds = 0;
    int nsockets = 0;

    zmq::app_thread_t *app_thread = NULL;

    for (int i = 0; i != nitems_; i++) {

        //  0MQ sockets.
        if (items_ [i].socket) {

            //  Get the app_thread the socket is living in. If there are two
            //  sockets in the same pollset with different app threads, fail.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            if (app_thread) {
                if (app_thread != s->get_thread ()) {
                    free (pollfds);
                    errno = EFAULT;
                    return -1;
                }
            }
            else
                app_thread = s->get_thread ();

            nsockets++;
            continue;
        }

        //  Raw file descriptors.
        pollfds [npollfds].fd = items_ [i].fd;
        pollfds [npollfds].events =
            (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
            (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
        npollfds++;
    }

    //  If there's at least one 0MQ socket in the pollset we have to poll
    //  for 0MQ commands. If ZMQ_POLL was not set, fail.
    if (nsockets) {
        pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
        if (pollfds [npollfds].fd == zmq::retired_fd) {
            free (pollfds);
            errno = ENOTSUP;
            return -1;
        }
        pollfds [npollfds].events = POLLIN;
        npollfds++;
    }

379 380 381 382
    //  First iteration just check for events, don't block. Waiting would
    //  prevent exiting on any events that may already been signaled on
    //  0MQ sockets.
    int rc = poll (pollfds, npollfds, 0);
383
    if (rc == -1 && errno == EINTR && timeout_ >= 0) {
384 385 386
        free (pollfds);
        return 0;
    }
387
    errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
388

389
    int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
390
    int nevents = 0;
391

392
    while (true) {
393 394 395 396 397 398 399 400 401 402 403 404

        //  Process 0MQ commands if needed.
        if (nsockets && pollfds [npollfds -1].revents & POLLIN)
            app_thread->process_commands (false, false);

        //  Check for the events.
        int pollfd_pos = 0;
        for (int i = 0; i != nitems_; i++) {

            //  If the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            if (!items_ [i].socket) {
405 406 407 408 409 410 411 412
                items_ [i].revents = 0;
                if (pollfds [pollfd_pos].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [pollfd_pos].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
                if (items_ [i].revents)
                    nevents++;
                pollfd_pos++;
                continue;
            }

            //  The poll item is a 0MQ socket.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            items_ [i].revents = 0;
            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
                items_ [i].revents |= ZMQ_POLLOUT;
            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
                items_ [i].revents |= ZMQ_POLLIN;
            if (items_ [i].revents)
                nevents++;
        }
429 430 431

        //  If there's at least one event, or if we are asked not to block,
        //  return immediately.
432
        if (nevents || !timeout_)
433 434
            break;

435 436 437 438 439 440 441 442 443 444 445 446
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            rc = poll (pollfds, npollfds, timeout);
            if (rc == -1 && errno == EINTR) {
                if (timeout_ < 0)
                    continue;
                else {
                    rc = 0;
                    break;
                }
            }
            errno_assert (rc >= 0);
447
            break;
448 449
        }
        
450 451 452 453 454 455 456 457 458
        //  If timeout was hit with no events signaled, return zero.
        if (rc == 0)
            break;

        //  If timeout was already applied, we don't want to poll anymore.
        //  Setting timeout to zero will cause termination of the function
        //  once the events we've got are processed.
        if (timeout > 0)
            timeout = 0;
459 460 461 462 463
    }

    free (pollfds);
    return nevents;

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS

    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

    zmq::app_thread_t *app_thread = NULL;
    int nsockets = 0;
    zmq::fd_t maxfd = zmq::retired_fd;
    zmq::fd_t notify_fd = zmq::retired_fd;

    for (int i = 0; i != nitems_; i++) {

        //  0MQ sockets.
        if (items_ [i].socket) {

            //  Get the app_thread the socket is living in. If there are two
            //  sockets in the same pollset with different app threads, fail.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            if (app_thread) {
                if (app_thread != s->get_thread ()) {
                    errno = EFAULT;
                    return -1;
                }
            }
            else
                app_thread = s->get_thread ();

            nsockets++;
            continue;
        }

        //  Raw file descriptors.
        if (items_ [i].events & ZMQ_POLLIN)
            FD_SET (items_ [i].fd, &pollset_in);
        if (items_ [i].events & ZMQ_POLLOUT)
            FD_SET (items_ [i].fd, &pollset_out);
504 505
        if (items_ [i].events & ZMQ_POLLERR)
            FD_SET (items_ [i].fd, &pollset_err);
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
        if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
            maxfd = items_ [i].fd;
    }

    //  If there's at least one 0MQ socket in the pollset we have to poll
    //  for 0MQ commands. If ZMQ_POLL was not set, fail.
    if (nsockets) {
        notify_fd = app_thread->get_signaler ()->get_fd ();
        if (notify_fd == zmq::retired_fd) {
            errno = ENOTSUP;
            return -1;
        }
        FD_SET (notify_fd, &pollset_in);
        if (maxfd == zmq::retired_fd || maxfd < notify_fd)
            maxfd = notify_fd;
    }

523
    bool block = (timeout_ < 0);
524 525
    timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
    timeval zero_timeout = {0, 0};
526 527
    int nevents = 0;

528 529 530
    //  First iteration just check for events, don't block. Waiting would
    //  prevent exiting on any events that may already been signaled on
    //  0MQ sockets.
531 532 533 534 535
    fd_set inset, outset, errset;
    memcpy (&inset, &pollset_in, sizeof (fd_set));
    memcpy (&outset, &pollset_out, sizeof (fd_set));
    memcpy (&errset, &pollset_err, sizeof (fd_set));
    int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout);
536
#if defined ZMQ_HAVE_WINDOWS
537
    wsa_assert (rc != SOCKET_ERROR);
538
#else
539 540 541
    if (rc == -1 && errno == EINTR && timeout_ >= 0)
        return 0;
    errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
542
#endif
543

544
    while (true) {
545 546

        //  Process 0MQ commands if needed.
547
        if (nsockets && FD_ISSET (notify_fd, &inset))
548 549 550 551 552 553 554 555
            app_thread->process_commands (false, false);

        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {

            //  If the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            if (!items_ [i].socket) {
556 557 558 559 560 561 562
                items_ [i].revents = 0;
                if (FD_ISSET (items_ [i].fd, &inset))
                    items_ [i].revents |= ZMQ_POLLIN;
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
                if (items_ [i].revents)
                    nevents++;
                continue;
            }

            //  The poll item is a 0MQ socket.
            zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
            items_ [i].revents = 0;
            if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
                items_ [i].revents |= ZMQ_POLLOUT;
            if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
                items_ [i].revents |= ZMQ_POLLIN;
            if (items_ [i].revents)
                nevents++;
        }
578 579 580 581 582 583

        //  If there's at least one event, or if we are asked not to block,
        //  return immediately.
        if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0))
            break;

584 585 586 587 588 589 590
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
            int rc = select (maxfd, &inset, &outset, &errset,
                block ? NULL : &timeout);
591
#if defined ZMQ_HAVE_WINDOWS
592
            wsa_assert (rc != SOCKET_ERROR);
593
#else
594 595 596 597 598 599 600 601 602
            if (rc == -1 && errno == EINTR) {
                if (timeout_ < 0)
                    continue;
                else {
                    rc = 0;
                    break;
                }
            }
            errno_assert (rc >= 0);
603
#endif
604 605
            break;
        }
606 607 608 609 610 611 612 613 614 615
        
        //  If timeout was hit with no events signaled, return zero.
        if (rc == 0)
            break;

        //  If timeout was already applied, we don't want to poll anymore.
        //  Setting timeout to zero will cause termination of the function
        //  once the events we've got are processed.
        if (!block)
            timeout = zero_timeout;
616 617 618 619 620 621 622
    }

    return nevents;

#else
    errno = ENOTSUP;
    return -1;
623 624 625
#endif
}

626 627 628 629 630
int zmq_errno ()
{
    return errno;
}

631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
#if defined ZMQ_HAVE_WINDOWS

static uint64_t now ()
{    
    //  Get the high resolution counter's accuracy.
    LARGE_INTEGER ticksPerSecond;
    QueryPerformanceFrequency (&ticksPerSecond);

    //  What time is it?
    LARGE_INTEGER tick;
    QueryPerformanceCounter (&tick);

    //  Convert the tick number into the number of seconds
    //  since the system was started.
    double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);     
    return (uint64_t) (tick.QuadPart / ticks_div);
}

void zmq_sleep (int seconds_)
{
    Sleep (seconds_ * 1000);
}

#else

static uint64_t now ()
{
    struct timeval tv;
    int rc;

    rc = gettimeofday (&tv, NULL);
    assert (rc == 0);
    return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
}

void zmq_sleep (int seconds_)
{
    sleep (seconds_);
}

#endif

void *zmq_stopwatch_start ()
{
    uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
    zmq_assert (watch);
    *watch = now ();
    return (void*) watch;
}

unsigned long zmq_stopwatch_stop (void *watch_)
{
    uint64_t end = now ();
    uint64_t start = *(uint64_t*) watch_;
    free (watch_);
    return (unsigned long) (end - start);
malosek's avatar
malosek committed
687
}