signaler.cpp 19.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "poller.hpp"
31 32 33 34 35

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
36
#if defined ZMQ_POLL_BASED_ON_POLL
37
#include <poll.h>
38
#elif defined ZMQ_POLL_BASED_ON_SELECT
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
55
#include "stdint.hpp"
56
#include "config.hpp"
57 58 59 60
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

61 62 63 64
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

65 66 67 68 69 70 71 72 73 74
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
107 108
    do {
        if (rc == -1 && errno == EAGAIN) {
109 110 111 112 113 114 115 116 117 118
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

119 120 121
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
122 123 124 125
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
126
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
127
    pid = getpid ();
128
#endif
129 130
}

131 132
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
133 134
zmq::signaler_t::~signaler_t ()
{
135
#if defined ZMQ_HAVE_EVENTFD
136
    if (r == retired_fd) return;
137
    int rc = close_wait_ms (r);
138 139
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
140 141 142 143 144 145 146 147 148 149 150 151 152
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
153
    }
154
#else
155 156 157 158 159
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
160
        int rc = close_wait_ms (r);
161 162
        errno_assert (rc == 0);
    }
163 164 165
#endif
}

Martin Hurton's avatar
Martin Hurton committed
166
zmq::fd_t zmq::signaler_t::get_fd () const
167 168 169 170 171 172
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
173 174
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
175 176 177 178
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
179 180 181 182 183
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
184
    unsigned char dummy = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
185
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
186 187 188 189 190 191 192 193
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
194
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
195
        if (unlikely (pid != getpid ())) {
196 197 198 199 200
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
201
        zmq_assert (nbytes == sizeof dummy);
202 203 204 205 206 207 208
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
209
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
210
    if (unlikely (pid != getpid ())) {
211
        // we have forked and the file descriptor is closed. Emulate an interrupt
212 213 214 215 216 217 218
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

219
#ifdef ZMQ_POLL_BASED_ON_POLL
220 221 222 223 224
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
225
        errno_assert (errno == EINTR);
226 227
        return -1;
    }
228 229
    else
    if (unlikely (rc == 0)) {
230 231 232
        errno = EAGAIN;
        return -1;
    }
233
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
234 235
    else
    if (unlikely (pid != getpid ())) {
236
        // we have forked and the file descriptor is closed. Emulate an interrupt
237 238 239 240 241 242
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
243 244 245 246
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

247
#elif defined ZMQ_POLL_BASED_ON_SELECT
248 249 250 251 252

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
253 254 255 256
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
257
#ifdef ZMQ_HAVE_WINDOWS
258 259
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
260 261
    wsa_assert (rc != SOCKET_ERROR);
#else
262 263
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
264
    if (unlikely (rc < 0)) {
265
        errno_assert (errno == EINTR);
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
302
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
303 304 305 306 307 308 309 310 311 312 313
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
314 315
{
    //  Attempt to read a signal.
316 317 318
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
319 320
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
321
        return -1;
322
    }
323 324 325 326 327 328 329 330 331
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
332
            return 0;
333
        }
334

335 336
        zmq_assert (dummy == 1);
    }
337
#else
338
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
339
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
340
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
341 342 343 344 345 346 347 348
    if (nbytes == SOCKET_ERROR) {
		const int last_error = WSAGetLastError();
		if (last_error == WSAEWOULDBLOCK) {
			errno = EAGAIN;
            return -1;
		}
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
349 350
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
351 352 353 354 355 356 357
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
358 359 360
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
361
#endif
362
    return 0;
363 364
}

365
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
366
void zmq::signaler_t::forked ()
367
{
368 369 370
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
371
    make_fdpair (&r, &w);
372 373 374
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
375
//  Returns -1 if we could not make the socket pair successfully
376 377
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
378 379
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
380 381 382 383 384 385 386 387 388
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
389 390

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
391
#   if !defined _WIN32_WCE
392
    // Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
393 394
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
395 396
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
397

Martin Hurton's avatar
Martin Hurton committed
398 399
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
400

Martin Hurton's avatar
Martin Hurton committed
401
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
402
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
403
#   endif
404

405 406 407 408
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
409 410 411
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
412 413 414
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
415 416 417 418 419
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
420
#       if !defined _WIN32_WCE
421
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
422
#       else
423
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
424 425
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
426
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
427
                              FALSE, L"Global\\zmq-signaler-port-sync");
428 429 430

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
431 432 433
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
434 435 436
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
437
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
438
#       endif
439 440

#       if !defined _WIN32_WCE
441
        sync = CreateMutexW (&sa, FALSE, mutex_name);
442
#       else
443
        sync = CreateMutexW (NULL, FALSE, mutex_name);
444 445
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
446
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
447 448 449

        win_assert (sync != NULL);
    }
450

451 452 453 454 455 456 457
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
458
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
459 460 461 462 463
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
464
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
465 466 467
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
468
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
469 470
    wsa_assert (rc != SOCKET_ERROR);

471
    //  Init sockaddr to signaler port.
472
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
473
    memset (&addr, 0, sizeof addr);
474 475
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
476
    addr.sin_port = htons (signaler_port);
477 478

    //  Create the writer socket.
479
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
480 481 482 483
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
484
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
485 486
    wsa_assert (rc != SOCKET_ERROR);

487 488 489
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
490
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
491
    }
492 493

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
494
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
495

496 497
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
498
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
499
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
500 501
    }

502 503 504 505
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

506
    //  Connect writer to the listener.
507
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
508
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
509

510 511
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
512
        *r_ = accept (listener, NULL, NULL);
513

514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

535 536 537 538 539
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

540
    //  We don't need the listening socket anymore. Close it.
541
    closesocket (listener);
542

543 544
    if (sync != NULL) {
        //  Exit the critical section.
545 546 547 548 549
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
550
        win_assert (brc != 0);
551

552 553 554 555
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
556

557
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
558
#   if !defined _WIN32_WCE
559
        //  On Windows, preventing sockets to be inherited by child processes.
560
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
561
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
562
#   endif
563
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
564 565
    }
    else {
566
        //  Cleanup writer if connection failed
567 568 569 570 571
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
572
        //  Set errno from saved value
573
        errno = wsa_error_to_errno (saved_errno);
574 575
        return -1;
    }
576 577 578 579 580 581 582 583 584

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
585
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
586
    memset (&lcladdr, 0, sizeof lcladdr);
587 588 589 590
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

591
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
592 593 594
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
595
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
596 597
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
598
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
599 600
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
601
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
602 603
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
604
    socklen_t lcladdr_len = sizeof lcladdr;
605

Pieter Hintjens's avatar
Pieter Hintjens committed
606
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
607 608 609 610 611
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

612
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
613 614
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
615
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
616 617
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
618
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
619 620
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
621
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
622 623 624 625 626 627 628 629 630
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
631 632
#else
    // All other implementations support socketpair()
633 634
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
635 636
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
637
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
638 639 640 641 642 643 644
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
645 646
#endif
}