zmq.cpp 28.3 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
19
#define ZMQ_TYPE_UNSAFE
Martin Sustrik's avatar
Martin Sustrik committed
20

21 22
#include "platform.hpp"

23 24 25 26 27
#if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
28 29 30 31
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
    defined ZMQ_HAVE_NETBSD
32 33
#define ZMQ_POLL_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
34
     defined ZMQ_HAVE_CYGWIN
35 36 37 38 39 40 41 42
#define ZMQ_POLL_BASED_ON_SELECT
#endif

//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
43 44 45
#include <poll.h>
#endif

AJ Lewis's avatar
AJ Lewis committed
46 47 48
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"

49
#if defined ZMQ_HAVE_WINDOWS
50 51 52 53 54
#include "windows.hpp"
#else
#include <unistd.h>
#endif

skaller's avatar
skaller committed
55

skaller's avatar
skaller committed
56
// XSI vector I/O
57
#if defined ZMQ_HAVE_UIO
skaller's avatar
skaller committed
58 59
#include <sys/uio.h>
#else
60
struct iovec {
skaller's avatar
skaller committed
61 62 63 64 65 66
    void *iov_base;
    size_t iov_len;
};
#endif


67
#include <string.h>
Martin Sustrik's avatar
Martin Sustrik committed
68 69 70
#include <stdlib.h>
#include <new>

71
#include "proxy.hpp"
72
#include "socket_base.hpp"
73
#include "stdint.hpp"
74
#include "config.hpp"
75
#include "likely.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
76
#include "clock.hpp"
77
#include "ctx.hpp"
78
#include "err.hpp"
79
#include "msg.hpp"
80
#include "fd.hpp"
81

82 83 84
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
Martin Sustrik's avatar
Martin Sustrik committed
85

86
#if defined ZMQ_HAVE_OPENPGM
87
#define __PGM_WININT_H__
88 89 90
#include <pgm/pgm.h>
#endif

91 92 93 94
//  Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
    [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];

95

96 97
void zmq_version (int *major_, int *minor_, int *patch_)
{
Martin Sustrik's avatar
Martin Sustrik committed
98 99 100
    *major_ = ZMQ_VERSION_MAJOR;
    *minor_ = ZMQ_VERSION_MINOR;
    *patch_ = ZMQ_VERSION_PATCH;
101 102
}

103

104 105
const char *zmq_strerror (int errnum_)
{
106
    return zmq::errno_to_string (errnum_);
107 108
}

109
int zmq_errno (void)
110 111 112 113 114
{
    return errno;
}


115
//  New context API
Martin Sustrik's avatar
Martin Sustrik committed
116

117 118
void *zmq_ctx_new (void)
{
119 120 121 122 123 124
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
Steven McCoy's avatar
Steven McCoy committed
125
    pgm_error_t *pgm_error = NULL;
126 127
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
128 129 130 131 132 133 134

        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
              pgm_error->code == PGM_ERROR_FAILED)) {

            //  Failed to access RTC or HPET device.
Steven McCoy's avatar
Steven McCoy committed
135
            pgm_error_free (pgm_error);
136 137 138
            errno = EINVAL;
            return NULL;
        }
139 140

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
141 142 143 144
        zmq_assert (false);
    }
#endif

145 146 147 148 149 150 151 152 153 154 155 156 157
#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
   //  We do this before the ctx constructor since its embedded mailbox_t
   //  object needs Winsock to be up and running.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
        HIBYTE (wsa_data.wVersion) == 2);
#endif

158
    //  Create 0MQ context.
159
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
160
    alloc_assert (ctx);
161 162 163
    return ctx;
}

164
int zmq_ctx_term (void *ctx_)
Martin Sustrik's avatar
Martin Sustrik committed
165
{
166
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
167 168 169
        errno = EFAULT;
        return -1;
    }
170

171 172 173
    int rc = ((zmq::ctx_t*) ctx_)->terminate ();
    int en = errno;

174 175
    //  Shut down only if termination was not interrupted by a signal.
    if (!rc || en != EINTR) {
176
#ifdef ZMQ_HAVE_WINDOWS
177 178 179
        //  On Windows, uninitialise socket layer.
        rc = WSACleanup ();
        wsa_assert (rc != SOCKET_ERROR);
180 181
#endif

182
#if defined ZMQ_HAVE_OPENPGM
183 184 185
        //  Shut down the OpenPGM library.
        if (pgm_shutdown () != TRUE)
            zmq_assert (false);
186
#endif
187
    }
188 189 190

    errno = en;
    return rc;
Martin Sustrik's avatar
Martin Sustrik committed
191 192
}

193 194 195 196 197 198 199 200 201 202
int zmq_ctx_shutdown (void *ctx_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }

    return ((zmq::ctx_t*) ctx_)->shutdown ();
}

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}

int zmq_ctx_get (void *ctx_, int option_)
{
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
        errno = EFAULT;
        return -1;
    }
    return ((zmq::ctx_t*) ctx_)->get (option_);
}

//  Stable/legacy context API

void *zmq_init (int io_threads_)
{
225 226 227 228 229 230 231
    if (io_threads_ >= 0) {
        void *ctx = zmq_ctx_new ();
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
        return ctx;
    }
    errno = EINVAL;
    return NULL;   
232 233 234 235
}

int zmq_term (void *ctx_)
{
236 237 238 239 240 241
    return zmq_ctx_term (ctx_);
}

int zmq_ctx_destroy (void *ctx_)
{
    return zmq_ctx_term (ctx_);
242 243 244 245
}


// Sockets
246

247
void *zmq_socket (void *ctx_, int type_)
Martin Sustrik's avatar
Martin Sustrik committed
248
{
249
    if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
250 251 252
        errno = EFAULT;
        return NULL;
    }
253 254
    zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
    zmq::socket_base_t *s = ctx->create_socket (type_);
255
    return (void *) s;
Martin Sustrik's avatar
Martin Sustrik committed
256 257
}

Martin Sustrik's avatar
Martin Sustrik committed
258
int zmq_close (void *s_)
Martin Sustrik's avatar
Martin Sustrik committed
259
{
260 261
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
262 263
        return -1;
    }
264
    ((zmq::socket_base_t*) s_)->close ();
Martin Sustrik's avatar
Martin Sustrik committed
265 266 267
    return 0;
}

268 269
int zmq_setsockopt (void *s_, int option_, const void *optval_,
    size_t optvallen_)
Martin Sustrik's avatar
Martin Sustrik committed
270
{
271 272
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
273 274
        return -1;
    }
skaller's avatar
skaller committed
275 276 277
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->setsockopt (option_, optval_, optvallen_);
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
278 279
}

280 281
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
282 283
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
284 285
        return -1;
    }
skaller's avatar
skaller committed
286 287 288
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->getsockopt (option_, optval_, optvallen_);
    return result;
289 290
}

291 292 293 294 295 296 297 298 299 300 301
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->monitor (addr_, events_);
    return result;
}

302
int zmq_bind (void *s_, const char *addr_)
Martin Sustrik's avatar
Martin Sustrik committed
303
{
304 305
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
306 307
        return -1;
    }
skaller's avatar
skaller committed
308 309 310
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->bind (addr_);
    return result;
311 312 313 314
}

int zmq_connect (void *s_, const char *addr_)
{
315 316
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
317 318
        return -1;
    }
skaller's avatar
skaller committed
319 320 321 322 323
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

324
int zmq_unbind (void *s_, const char *addr_)
325 326 327 328 329 330
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
331
    return s->term_endpoint (addr_);
332 333
}

334
int zmq_disconnect (void *s_, const char *addr_)
335 336 337 338 339 340
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
341
    return s->term_endpoint (addr_);
342 343
}

344 345
// Sending functions.

346 347
static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
348 349 350 351 352 353 354 355
{
    int sz = (int) zmq_msg_size (msg_);
    int rc = s_->send ((zmq::msg_t*) msg_, flags_);
    if (unlikely (rc < 0))
        return -1;
    return sz;
}

356
/*  To be deprecated once zmq_msg_send() is stable                           */
skaller's avatar
skaller committed
357 358
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
359
    return zmq_msg_send (msg_, s_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
360 361
}

362 363
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
364 365 366 367
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
368 369 370 371 372 373
    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, len_);
    if (rc != 0)
        return -1;
    memcpy (zmq_msg_data (&msg), buf_, len_);

skaller's avatar
skaller committed
374
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
375
    rc = s_sendmsg (s, &msg, flags_);
376 377 378 379 380 381 382 383 384 385
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
Uli Köhler's avatar
Uli Köhler committed
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
    return rc;
}

int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq_msg_t msg;
    int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL);
    if (rc != 0)
        return -1;

    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    rc = s_sendmsg (s, &msg, flags_);
    if (unlikely (rc < 0)) {
        int err = errno;
        int rc2 = zmq_msg_close (&msg);
        errno_assert (rc2 == 0);
        errno = err;
        return -1;
    }
    
    //  Note the optimisation here. We don't close the msg object as it is
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
412 413 414
    return rc;
}

415

skaller's avatar
skaller committed
416
// Send multiple messages.
417
// TODO: this function has no man page
skaller's avatar
skaller committed
418 419 420
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
skaller's avatar
skaller committed
421
// ZMQ_SNDMORE bit switched off.
skaller's avatar
skaller committed
422
//
423
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
skaller's avatar
skaller committed
424 425 426 427 428 429 430 431
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    int rc = 0;
    zmq_msg_t msg;
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
432 433
    
    for (size_t i = 0; i < count_; ++i) {
skaller's avatar
skaller committed
434
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
435
        if (rc != 0) {
skaller's avatar
skaller committed
436 437 438 439
            rc = -1;
            break;
        }
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
440 441 442
        if (i == count_ - 1)
            flags_ = flags_ & ~ZMQ_SNDMORE;
        rc = s_sendmsg (s, &msg, flags_);
skaller's avatar
skaller committed
443 444 445 446 447 448 449 450 451 452 453 454
        if (unlikely (rc < 0)) {
           int err = errno;
           int rc2 = zmq_msg_close (&msg);
           errno_assert (rc2 == 0);
           errno = err;
           rc = -1;
           break;
        }
    }
    return rc; 
}

455 456
// Receiving functions.

457 458
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
skaller's avatar
skaller committed
459
{
460
    int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
skaller's avatar
skaller committed
461 462 463 464 465
    if (unlikely (rc < 0))
        return -1;
    return (int) zmq_msg_size (msg_);
}

466
/*  To be deprecated once zmq_msg_recv() is stable                           */
skaller's avatar
skaller committed
467 468
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
469
    return zmq_msg_recv (msg_, s_, flags_);
skaller's avatar
skaller committed
470 471 472
}


473 474
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
skaller's avatar
skaller committed
475 476 477 478
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
479 480 481 482
    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    errno_assert (rc == 0);

skaller's avatar
skaller committed
483
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
484
    int nbytes = s_recvmsg (s, &msg, flags_);
485
    if (unlikely (nbytes < 0)) {
486
        int err = errno;
487 488
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
489 490 491 492 493 494
        errno = err;
        return -1;
    }

    //  At the moment an oversized message is silently truncated.
    //  TODO: Build in a notification mechanism to report the overflows.
495
    size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
496
    memcpy (buf_, zmq_msg_data (&msg), to_copy);
497 498 499 500

    rc = zmq_msg_close (&msg);
    errno_assert (rc == 0);

501
    return nbytes;
502 503
}

skaller's avatar
skaller committed
504 505 506 507 508 509 510 511 512 513 514 515 516 517
// Receive a multi-part message
// 
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this 
// function may be freed using free().
518
// TODO: this function has no man page
skaller's avatar
skaller committed
519
//
520
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
skaller's avatar
skaller committed
521 522 523 524 525 526 527
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;

528
    size_t count = *count_;
skaller's avatar
skaller committed
529 530
    int nread = 0;
    bool recvmore = true;
531 532
    
    *count_ = 0;
skaller's avatar
skaller committed
533

534
    for (size_t i = 0; recvmore && i < count; ++i) {
535
       
skaller's avatar
skaller committed
536 537 538 539
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);

540
        int nbytes = s_recvmsg (s, &msg, flags_);
skaller's avatar
skaller committed
541 542 543 544 545 546 547 548 549 550
        if (unlikely (nbytes < 0)) {
            int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            nread = -1;
            break;
        }

        a_[i].iov_len = zmq_msg_size (&msg);
551
        a_[i].iov_base = malloc(a_[i].iov_len);
552
        if (unlikely (!a_[i].iov_base)) {
553 554 555 556 557
            errno = ENOMEM;
            return -1;
        }
        memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
               a_[i].iov_len);
skaller's avatar
skaller committed
558
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
559
        recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
560 561 562 563
        rc = zmq_msg_close(&msg);
        errno_assert (rc == 0);
        ++*count_;
        ++nread;
skaller's avatar
skaller committed
564
    }
565
    return nread;
skaller's avatar
skaller committed
566 567
}

568 569
// Message manipulators.

570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
    return ((zmq::msg_t*) msg_)->init_size (size_);
}

int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
    zmq_free_fn *ffn_, void *hint_)
{
    return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}

586 587 588 589 590 591 592
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
593
    int result = s_sendmsg (s, msg_, flags_);
594 595 596 597 598 599 600 601 602 603
    return result;
}

int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
Pieter Hintjens's avatar
Pieter Hintjens committed
604
    int result = s_recvmsg (s, msg_, flags_);
605 606 607
    return result;
}

608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
int zmq_msg_close (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->close ();
}

int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}

int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
    return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}

void *zmq_msg_data (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->data ();
}

628
size_t zmq_msg_size (zmq_msg_t *msg_)
629 630 631 632
{
    return ((zmq::msg_t*) msg_)->size ();
}

633 634
int zmq_msg_more (zmq_msg_t *msg_)
{
635
    return zmq_msg_get (msg_, ZMQ_MORE);
636 637
}

638
int zmq_msg_get (zmq_msg_t *msg_, int option_)
639 640
{
    switch (option_) {
641
        case ZMQ_MORE:
642
            return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
643
        default:
644 645
            errno = EINVAL;
            return -1;
646 647 648
    }
}

649
int zmq_msg_set (zmq_msg_t *, int, int)
650 651 652 653
{
    //  No options supported at present
    errno = EINVAL;
    return -1;
654 655
}

656 657
// Polling.

658 659 660
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
661 662 663 664 665 666 667 668
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
669
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
670
        return 0;
Mikko Koppanen's avatar
Mikko Koppanen committed
671 672 673
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
        return 0;
674
#else
675
        return usleep (timeout_ * 1000);
676 677
#endif
    }
678 679 680 681 682 683

    if (!items_) {
        errno = EFAULT;
        return -1;
    }

684 685 686
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
687 688
    pollfd spollfds[ZMQ_POLLITEMS_DFLT];
    pollfd *pollfds = spollfds;
689

690 691 692 693
    if (nitems_ > ZMQ_POLLITEMS_DFLT) {
        pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
        alloc_assert (pollfds);
    }
694

695
    //  Build pollset for poll () system call.
696 697
    for (int i = 0; i != nitems_; i++) {

698 699
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
        //  retrieved by the ZMQ_FD socket option.
700
        if (items_ [i].socket) {
701
            size_t zmq_fd_size = sizeof (zmq::fd_t);
702 703
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
                &zmq_fd_size) == -1) {
704 705
                if (pollfds != spollfds)
                    free (pollfds);
706
                return -1;
707
            }
708
            pollfds [i].events = items_ [i].events ? POLLIN : 0;
709
        }
710 711
        //  Else, the poll item is a raw file descriptor. Just convert the
        //  events to normal POLLIN/POLLOUT for poll ().
Martin Lucina's avatar
Martin Lucina committed
712
        else {
713 714 715 716
            pollfds [i].fd = items_ [i].fd;
            pollfds [i].events =
                (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
                (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
Martin Lucina's avatar
Martin Lucina committed
717
        }
718 719
    }

720
    bool first_pass = true;
721
    int nevents = 0;
722

723
    while (true) {
724 725 726 727 728 729 730 731 732
        //  Compute the timeout for the subsequent poll.
        int timeout;
        if (first_pass)
            timeout = 0;
        else
        if (timeout_ < 0)
            timeout = -1;
        else
            timeout = end - now;
733

734
        //  Wait for events.
735
        while (true) {
736
            int rc = poll (pollfds, nitems_, timeout);
737
            if (rc == -1 && errno == EINTR) {
738 739
                if (pollfds != spollfds)
                    free (pollfds);
740
                return -1;
741
            }
742 743
            errno_assert (rc >= 0);
            break;
744
        }
745 746
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
747

748
            items_ [i].revents = 0;
749

750 751
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
752
            if (items_ [i].socket) {
753 754 755 756
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                    &zmq_events_size) == -1) {
757 758
                    if (pollfds != spollfds)
                        free (pollfds);
759 760 761 762 763 764 765 766
                    return -1;
                }
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
767
            }
768 769 770 771 772 773 774 775 776 777 778 779 780
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (pollfds [i].revents & POLLIN)
                    items_ [i].revents |= ZMQ_POLLIN;
                if (pollfds [i].revents & POLLOUT)
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (pollfds [i].revents & ~(POLLIN | POLLOUT))
                    items_ [i].revents |= ZMQ_POLLERR;
            }

            if (items_ [i].revents)
                nevents++;
781
        }
782

783 784 785
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
786

787 788 789 790 791 792
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
793 794 795
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
796
            continue;
797
        }
798

799 800 801 802 803 804
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
805
            end = now + timeout_;
806 807 808
            if (now == end)
                break;
            first_pass = false;
809 810
            continue;
        }
811

812 813 814 815
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
816 817
    }

818 819
    if (pollfds != spollfds)
        free (pollfds);
820 821
    return nevents;

822
#elif defined ZMQ_POLL_BASED_ON_SELECT
823

824 825 826 827 828 829 830 831
    if (unlikely (nitems_ < 0)) {
        errno = EINVAL;
        return -1;
    }
    if (unlikely (nitems_ == 0)) {
        if (timeout_ == 0)
            return 0;
#if defined ZMQ_HAVE_WINDOWS
832
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
833 834
        return 0;
#else
835
        return usleep (timeout_ * 1000);
836 837
#endif
    }
838 839 840 841 842 843 844 845
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
    zmq_assert (nitems_ <= FD_SETSIZE);

846 847 848 849 850 851 852
    fd_set pollset_in;
    FD_ZERO (&pollset_in);
    fd_set pollset_out;
    FD_ZERO (&pollset_out);
    fd_set pollset_err;
    FD_ZERO (&pollset_err);

853
    zmq::fd_t maxfd = 0;
854

855
    //  Build the fd_sets for passing to select ().
856 857
    for (int i = 0; i != nitems_; i++) {

858 859
        //  If the poll item is a 0MQ socket we are interested in input on the
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
860
        if (items_ [i].socket) {
861 862 863 864 865
            size_t zmq_fd_size = sizeof (zmq::fd_t);
            zmq::fd_t notify_fd;
            if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
                &zmq_fd_size) == -1)
                return -1;
866 867 868 869 870
            if (items_ [i].events) {
                FD_SET (notify_fd, &pollset_in);
                if (maxfd < notify_fd)
                    maxfd = notify_fd;
            }
871
        }
872 873 874 875 876 877 878 879 880 881 882
        //  Else, the poll item is a raw file descriptor. Convert the poll item
        //  events to the appropriate fd_sets.
        else {
            if (items_ [i].events & ZMQ_POLLIN)
                FD_SET (items_ [i].fd, &pollset_in);
            if (items_ [i].events & ZMQ_POLLOUT)
                FD_SET (items_ [i].fd, &pollset_out);
            if (items_ [i].events & ZMQ_POLLERR)
                FD_SET (items_ [i].fd, &pollset_err);
            if (maxfd < items_ [i].fd)
                maxfd = items_ [i].fd;
883 884 885
        }
    }

886
    bool first_pass = true;
887
    int nevents = 0;
888
    fd_set inset, outset, errset;
889 890

    while (true) {
891

892 893 894 895 896 897 898 899
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
        }
900 901
        else
        if (timeout_ < 0)
902 903 904 905 906 907 908
            ptimeout = NULL;
        else {
            timeout.tv_sec = (long) ((end - now) / 1000);
            timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
            ptimeout = &timeout;
        }

909 910 911 912 913
        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
            memcpy (&inset, &pollset_in, sizeof (fd_set));
            memcpy (&outset, &pollset_out, sizeof (fd_set));
            memcpy (&errset, &pollset_err, sizeof (fd_set));
914
#if defined ZMQ_HAVE_WINDOWS
915
            int rc = select (0, &inset, &outset, &errset, ptimeout);
916
            if (unlikely (rc == SOCKET_ERROR)) {
917 918 919
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
                wsa_assert (errno == ENOTSOCK);
                return -1;
920
            }
921
#else
922
            int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
Bernd Prager's avatar
Bernd Prager committed
923
            if (unlikely (rc == -1)) {
924 925
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
926
            }
927
#endif
928 929
            break;
        }
930

931 932
        //  Check for the events.
        for (int i = 0; i != nitems_; i++) {
933

934
            items_ [i].revents = 0;
935

936 937 938
            //  The poll item is a 0MQ socket. Retrieve pending events
            //  using the ZMQ_EVENTS socket option.
            if (items_ [i].socket) {
939 940 941 942
                size_t zmq_events_size = sizeof (uint32_t);
                uint32_t zmq_events;
                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
                      &zmq_events_size) == -1)
943
                    return -1;
944 945 946 947 948 949
                if ((items_ [i].events & ZMQ_POLLOUT) &&
                      (zmq_events & ZMQ_POLLOUT))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if ((items_ [i].events & ZMQ_POLLIN) &&
                      (zmq_events & ZMQ_POLLIN))
                    items_ [i].revents |= ZMQ_POLLIN;
950 951 952 953 954
            }
            //  Else, the poll item is a raw file descriptor, simply convert
            //  the events to zmq_pollitem_t-style format.
            else {
                if (FD_ISSET (items_ [i].fd, &inset))
955
                    items_ [i].revents |= ZMQ_POLLIN;
956 957 958 959
                if (FD_ISSET (items_ [i].fd, &outset))
                    items_ [i].revents |= ZMQ_POLLOUT;
                if (FD_ISSET (items_ [i].fd, &errset))
                    items_ [i].revents |= ZMQ_POLLERR;
960
            }
961 962 963

            if (items_ [i].revents)
                nevents++;
964
        }
965

966 967 968
        //  If timout is zero, exit immediately whether there are events or not.
        if (timeout_ == 0)
            break;
969

970 971 972 973 974 975
        //  If there are events to return, we can exit immediately.
        if (nevents)
            break;

        //  At this point we are meant to wait for events but there are none.
        //  If timeout is infinite we can just loop until we get some events.
976 977 978
        if (timeout_ < 0) {
            if (first_pass)
                first_pass = false;
979
            continue;
980
        }
981

982 983 984 985 986 987
        //  The timeout is finite and there are no events. In the first pass
        //  we get a timestamp of when the polling have begun. (We assume that
        //  first pass have taken negligible time). We also compute the time
        //  when the polling should time out.
        if (first_pass) {
            now = clock.now_ms ();
988
            end = now + timeout_;
989 990 991
            if (now == end)
                break;
            first_pass = false;
992 993
            continue;
        }
994

995 996 997 998
        //  Find out whether timeout have expired.
        now = clock.now_ms ();
        if (now >= end)
            break;
999 1000 1001 1002 1003
    }

    return nevents;

#else
1004
    //  Exotic platforms that support neither poll() nor select().
1005 1006
    errno = ENOTSUP;
    return -1;
1007 1008 1009
#endif
}

1010 1011 1012 1013 1014 1015
#if defined ZMQ_POLL_BASED_ON_SELECT
#undef ZMQ_POLL_BASED_ON_SELECT
#endif
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
1016

1017 1018 1019
//  The proxy functionality

int zmq_proxy (void *frontend_, void *backend_, void *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
1020
{
1021
    if (!frontend_ || !backend_) {
Pieter Hintjens's avatar
Pieter Hintjens committed
1022 1023 1024
        errno = EFAULT;
        return -1;
    }
1025 1026 1027 1028 1029
    return zmq::proxy (
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_,
        (zmq::socket_base_t*) control_);
}
Pieter Hintjens's avatar
Pieter Hintjens committed
1030

1031
//  The deprecated device functionality
Pieter Hintjens's avatar
Pieter Hintjens committed
1032

1033
int zmq_device (int /* type */, void *frontend_, void *backend_)
1034 1035 1036 1037
{
    return zmq::proxy (
        (zmq::socket_base_t*) frontend_,
        (zmq::socket_base_t*) backend_, NULL);
Pieter Hintjens's avatar
Pieter Hintjens committed
1038
}