zmq.cpp 28.9 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
21
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
22

23 24
#include "platform.hpp"

25 26 27 28 29
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
30 31 32 33
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
34 35
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
36
     defined ZMQ_HAVE_CYGWIN
37 38 39 40 41 42 43 44
#define ZMQ_POLL_BASED_ON_SELECT
#endif

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
45 46 47
#include <poll.h>
#endif

AJ Lewis's avatar
AJ Lewis committed
48 49 50
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"

51
#if defined ZMQ_HAVE_WINDOWS
52 53 54 55 56
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
57

skaller's avatar
skaller committed
58 59 60 61
// XSI vector I/O
#if ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
62
struct iovec {
skaller's avatar
skaller committed
63 64 65 66 67 68
    void *iov_base;
    size_t iov_len;
};
#endif


69
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
70 71 72
#include <stdlib.h>
#include <new>

73
#include "proxy.hpp"
74
#include "socket_base.hpp"
75
#include "stdint.hpp"
76
#include "config.hpp"
77
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
78
#include "clock.hpp"
79
#include "ctx.hpp"
80
#include "err.hpp"
81
#include "msg.hpp"
82
#include "fd.hpp"
83

84 85 86
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
87

88
#if defined ZMQ_HAVE_OPENPGM
89
#define __PGM_WININT_H__
90 91 92
#include <pgm/pgm.h>
#endif

93 94 95 96
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

97

98 99
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
100 101 102
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
103 104
}

105

106 107
const char *zmq_strerror (int errnum_)
{
108
    return zmq::errno_to_string (errnum_);
109 110
}

111 112 113 114 115 116
int zmq_errno ()
{
    return errno;
}


117
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
118

119 120
void *zmq_ctx_new (void)
{
121 122 123 124 125 126
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
127
    pgm_error_t *pgm_error = NULL;
128 129
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
130 131 132 133 134 135 136

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
137
            pgm_error_free (pgm_error);
138 139 140
            errno = EINVAL;
            return NULL;
        }
141 142

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
143 144 145 146
        zmq_assert (false);
    }
#endif

147 148 149 150 151 152 153 154 155 156 157 158 159
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

160
    //  Create 0MQ context.
161
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
162
    alloc_assert (ctx);
163 164 165
    return ctx;
}

166
int zmq_ctx_destroy (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
167
{
168
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
169 170 171
        errno = EFAULT;
        return -1;
    }
172

173 174 175
    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

176 177
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
178
#ifdef ZMQ_HAVE_WINDOWS
179 180 181
        //  On Windows, uninitialise socket layer.
        rc = WSACleanup ();
        wsa_assert (rc != SOCKET_ERROR);
182 183
#endif

184
#if defined ZMQ_HAVE_OPENPGM
185 186 187
        //  Shut down the OpenPGM library.
        if (pgm_shutdown () != TRUE)
            zmq_assert (false);
188
#endif
189
    }
190 191 192

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
193 194
}

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}

int zmq_ctx_get (void *ctx_, int option_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->get (option_);
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
217 218 219 220 221 222 223
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
    return NULL;   
224 225 226 227 228 229 230 231 232
}

int zmq_term (void *ctx_)
{
    return zmq_ctx_destroy (ctx_);
}


// Sockets
233

234
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
235
{
236
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
237 238 239
        errno = EFAULT;
        return NULL;
    }
240 241
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
242
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
243 244
}

Martin Sustrik's avatar
Martin Sustrik committed
245
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
246
{
247 248
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
249 250
        return -1;
    }
251
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
252 253 254
    return 0;
}

255 256
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
257
{
258 259
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
260 261
        return -1;
    }
skaller's avatar
skaller committed
262 263 264
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->setsockopt (option_, optval_, optvallen_);
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
265 266
}

267 268
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
269 270
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
271 272
        return -1;
    }
skaller's avatar
skaller committed
273 274 275
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->getsockopt (option_, optval_, optvallen_);
    return result;
276 277
}

278 279 280 281 282 283 284 285 286 287 288
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->monitor (addr_, events_);
    return result;
}

289
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
290
{
291 292
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
293 294
        return -1;
    }
skaller's avatar
skaller committed
295 296 297
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->bind (addr_);
    return result;
298 299 300 301
}

int zmq_connect (void *s_, const char *addr_)
{
302 303
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
304 305
        return -1;
    }
skaller's avatar
skaller committed
306 307 308 309 310
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

311
int zmq_unbind (void *s_, const char *addr_)
312 313 314 315 316 317
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
318
    return s->term_endpoint (addr_);
319 320
}

321
int zmq_disconnect (void *s_, const char *addr_)
322 323 324 325 326 327
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
328
    return s->term_endpoint (addr_);
329 330
}

331 332
// Sending functions.

333 334
static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
335 336 337 338 339 340 341 342
{
    int sz = (int) zmq_msg_size (msg_);
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return sz;
}

343
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
344 345
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
346
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
347 348
}

349 350
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
351 352 353 354
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
355 356 357 358 359 360
    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
361
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
362
    rc = s_sendmsg (s, &msg, flags_);
363 364 365 366 367 368 369 370 371 372 373 374 375
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
    return rc;
}

skaller's avatar
skaller committed
376 377 378 379
// Send multiple messages.
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
380
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
381
//
382
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
383 384 385 386 387 388 389 390
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
391 392
    
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
393
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
394
        if (rc != 0) {
skaller's avatar
skaller committed
395 396 397 398
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
399 400 401
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
        rc = s_sendmsg (s, &msg, flags_);
skaller's avatar
skaller committed
402 403 404 405 406 407 408 409 410 411 412 413
        if (unlikely (rc < 0)) {
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
    return rc; 
}

414 415
// Receiving functions.

416 417
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
418
{
419
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
skaller's avatar
skaller committed
420 421 422 423 424
    if (unlikely (rc < 0))
        return -1;
    return (int) zmq_msg_size (msg_);
}

425
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
426 427
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
428
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
429 430 431
}


432 433
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
434 435 436 437
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
438 439 440 441
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
442
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
443
    int nbytes = s_recvmsg (s, &msg, flags_);
444
    if (unlikely (nbytes < 0)) {
445
        int err = errno;
446 447
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
448 449 450 451 452 453
        errno = err;
        return -1;
    }

    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
454
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
455
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
456 457 458 459

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

460
    return nbytes;
461 462
}

skaller's avatar
skaller committed
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
// Receive a multi-part message
// 
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this 
// function may be freed using free().
//
// Implementation note: We assume zmq::msg_t buffer allocated
// by zmq::recvmsg can be freed by free().
// We assume it is safe to steal these buffers by simply
// not closing the zmq::msg_t.
//
483
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
484 485 486 487 488 489 490
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;

491
    size_t count = *count_;
skaller's avatar
skaller committed
492 493
    int nread = 0;
    bool recvmore = true;
494 495
    
    *count_ = 0;
skaller's avatar
skaller committed
496

497
    for (size_t i = 0; recvmore && i < count; ++i) {
skaller's avatar
skaller committed
498 499 500 501 502 503
        // Cheat! We never close any msg
        // because we want to steal the buffer.
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

504
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
505 506 507 508 509 510 511 512
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }
513
        ++*count_;
skaller's avatar
skaller committed
514 515 516
        ++nread;

        // Cheat: acquire zmq_msg buffer.
517
        a_[i].iov_base = static_cast<char *> (zmq_msg_data (&msg));
skaller's avatar
skaller committed
518 519 520
        a_[i].iov_len = zmq_msg_size (&msg);

        // Assume zmq_socket ZMQ_RVCMORE is properly set.
521
        recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
skaller's avatar
skaller committed
522
    }
523
    return nread;
skaller's avatar
skaller committed
524 525
}

526 527
// Message manipulators.

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

544 545 546 547 548 549 550
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
551
    int result = s_sendmsg (s, msg_, flags_);
552 553 554 555 556 557 558 559 560 561
    return result;
}

int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
562
    int result = s_recvmsg (s, msg_, flags_);
563 564 565
    return result;
}

566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

586
size_t zmq_msg_size (zmq_msg_t *msg_)
587 588 589 590
{
    return ((zmq::msg_t*) msg_)->size ();
}

591 592
int zmq_msg_more (zmq_msg_t *msg_)
{
593
    return zmq_msg_get (msg_, ZMQ_MORE);
594 595
}

596
int zmq_msg_get (zmq_msg_t *msg_, int option_)
597 598
{
    switch (option_) {
599
        case ZMQ_MORE:
600
            return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
601
        default:
602 603
            errno = EINVAL;
            return -1;
604 605 606
    }
}

607
int zmq_msg_set (zmq_msg_t *, int, int)
608 609 610 611
{
    //  No options supported at present
    errno = EINVAL;
    return -1;
612 613
}

614 615
// Polling.

616 617 618
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
619 620 621 622 623 624 625 626
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
627
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
628
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
629 630 631
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
632
#else
633
        return usleep (timeout_ * 1000);
634 635
#endif
    }
636 637 638 639 640 641

    if (!items_) {
        errno = EFAULT;
        return -1;
    }

642 643 644 645
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

646
    pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
647
    alloc_assert (pollfds);
648

649
    //  Build pollset for poll () system call.
650 651
    for (int i = 0; i != nitems_; i++) {

652 653
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
654
        if (items_ [i].socket) {
655
            size_t zmq_fd_size = sizeof (zmq::fd_t);
656 657 658 659
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
                free (pollfds);
                return -1;
660
            }
661
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
662
        }
663 664
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
665
        else {
666 667 668 669
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
670
        }
671 672
    }

673
    bool first_pass = true;
674
    int nevents = 0;
675

676
    while (true) {
677 678 679 680 681 682 683 684 685
        //  Compute the timeout for the subsequent poll.
        int timeout;
        if (first_pass)
            timeout = 0;
        else
        if (timeout_ < 0)
            timeout = -1;
        else
            timeout = end - now;
686

687
        //  Wait for events.
688
        while (true) {
689
            int rc = poll (pollfds, nitems_, timeout);
690
            if (rc == -1 && errno == EINTR) {
691 692
                free (pollfds);
                return -1;
693
            }
694 695
            errno_assert (rc >= 0);
            break;
696
        }
697 698
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
699

700
            items_ [i].revents = 0;
701

702 703
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
704
            if (items_ [i].socket) {
705 706 707 708 709 710 711 712 713 714 715 716 717
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
                    free (pollfds);
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
718
            }
719 720 721 722 723 724 725 726 727 728 729 730 731
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
732
        }
733

734 735 736
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
737

738 739 740 741 742 743
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
744 745 746
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
747
            continue;
748
        }
749

750 751 752 753 754 755
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
756
            end = now + timeout_;
757 758 759
            if (now == end)
                break;
            first_pass = false;
760 761
            continue;
        }
762

763 764 765 766
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
767 768 769 770 771
    }

    free (pollfds);
    return nevents;

772
#elif defined ZMQ_POLL_BASED_ON_SELECT
773

774 775 776 777 778 779 780 781
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
782
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
783 784
        return 0;
#else
785
        return usleep (timeout_ * 1000);
786 787
#endif
    }
788 789 790 791 792 793 794 795
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

796 797 798 799 800 801 802
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

803
    zmq::fd_t maxfd = 0;
804

805
    //  Build the fd_sets for passing to select ().
806 807
    for (int i = 0; i != nitems_; i++) {

808 809
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
810
        if (items_ [i].socket) {
811 812 813 814 815
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
816 817 818 819 820
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
821
        }
822 823 824 825 826 827 828 829 830 831 832
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
833 834 835
        }
    }

836
    bool first_pass = true;
837
    int nevents = 0;
838
    fd_set inset, outset, errset;
839 840

    while (true) {
841

842 843 844 845 846 847 848 849
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
850 851
        else
        if (timeout_ < 0)
852 853 854 855 856 857 858
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

859 860 861 862 863
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
864
#if defined ZMQ_HAVE_WINDOWS
865
            int rc = select (0, &inset, &outset, &errset, ptimeout);
866
            if (unlikely (rc == SOCKET_ERROR)) {
867 868 869
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
870
            }
871
#else
872
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
873
            if (unlikely (rc == -1)) {
874 875
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
876
            }
877
#endif
878 879
            break;
        }
880

881 882
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
883

884
            items_ [i].revents = 0;
885

886 887 888
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
889 890 891 892
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
893
                    return -1;
894 895 896 897 898 899
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
900 901 902 903 904
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
905
                    items_ [i].revents |= ZMQ_POLLIN;
906 907 908 909
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
910
            }
911 912 913

            if (items_ [i].revents)
                nevents++;
914
        }
915

916 917 918
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
919

920 921 922 923 924 925
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
926 927 928
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
929
            continue;
930
        }
931

932 933 934 935 936 937
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
938
            end = now + timeout_;
939 940 941
            if (now == end)
                break;
            first_pass = false;
942 943
            continue;
        }
944

945 946 947 948
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
949 950 951 952 953
    }

    return nevents;

#else
954
    //  Exotic platforms that support neither poll() nor select().
955 956
    errno = ENOTSUP;
    return -1;
957 958 959
#endif
}

960 961 962 963 964 965
#if defined ZMQ_POLL_BASED_ON_SELECT
#undef ZMQ_POLL_BASED_ON_SELECT
#endif
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
966

967 968 969
//  The proxy functionality

int zmq_proxy (void *frontend_, void *backend_, void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
970
{
971
    if (!frontend_ || !backend_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
972 973 974
        errno = EFAULT;
        return -1;
    }
975 976 977 978 979
    return zmq::proxy (
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
        (zmq::socket_base_t*) control_);
}
Pieter Hintjens's avatar
Pieter Hintjens committed
980

981
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
982

983 984 985 986 987
int zmq_device (int type, void *frontend_, void *backend_)
{
    return zmq::proxy (
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_, NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
988 989
}

990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
//  Callback to free socket event data

void zmq_free_event (void *event_data, void *hint)
{
    zmq_event_t *event = (zmq_event_t *) event_data;

    switch (event->event) {
    case ZMQ_EVENT_CONNECTED:
        free (event->data.connected.addr);
        break;
    case ZMQ_EVENT_CONNECT_DELAYED:
        free (event->data.connect_delayed.addr);
        break;
    case ZMQ_EVENT_CONNECT_RETRIED:
        free (event->data.connect_retried.addr);
        break;
    case ZMQ_EVENT_LISTENING:
        free (event->data.listening.addr);
        break;
    case ZMQ_EVENT_BIND_FAILED:
        free (event->data.bind_failed.addr);
        break;
    case ZMQ_EVENT_ACCEPTED:
        free (event->data.accepted.addr);
        break;
    case ZMQ_EVENT_ACCEPT_FAILED:
        free (event->data.accept_failed.addr);
        break;
    case ZMQ_EVENT_CLOSED:
        free (event->data.closed.addr);
        break;
    case ZMQ_EVENT_CLOSE_FAILED:
        free (event->data.close_failed.addr);
        break;
    case ZMQ_EVENT_DISCONNECTED:
        free (event->data.disconnected.addr);
        break;
    }
    free (event_data);
}

Pieter Hintjens's avatar
Pieter Hintjens committed
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
////////////////////////////////////////////////////////////////////////////////
//  0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////

void zmq_sleep (int seconds_)
{
#if defined ZMQ_HAVE_WINDOWS
    Sleep (seconds_ * 1000);
#else
    sleep (seconds_);
#endif
}

void *zmq_stopwatch_start ()
{
    uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
    alloc_assert (watch);
    *watch = zmq::clock_t::now_us ();
    return (void*) watch;
}

unsigned long zmq_stopwatch_stop (void *watch_)
{
    uint64_t end = zmq::clock_t::now_us ();
    uint64_t start = *(uint64_t*) watch_;
    free (watch_);
    return (unsigned long) (end - start);
}