signaler.cpp 19.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "poller.hpp"
32 33 34 35 36

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
37
#if defined ZMQ_POLL_BASED_ON_POLL
38
#if !defined ZMQ_HAVE_WINDOWS
39
#include <poll.h>
40
#endif
41
#elif defined ZMQ_POLL_BASED_ON_SELECT
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
57
#include "stdint.hpp"
58
#include "config.hpp"
59 60 61 62
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

63 64 65 66
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

67
#if !defined ZMQ_HAVE_WINDOWS
68 69 70 71 72 73
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
106 107
    do {
        if (rc == -1 && errno == EAGAIN) {
108 109 110 111 112 113 114 115 116 117
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

118 119 120
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
121 122 123 124
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
125
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
126
    pid = getpid ();
127
#endif
128 129
}

130 131
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
132 133
zmq::signaler_t::~signaler_t ()
{
134
#if defined ZMQ_HAVE_EVENTFD
135
    if (r == retired_fd) return;
136
    int rc = close_wait_ms (r);
137 138
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
139 140 141 142 143 144 145 146 147 148 149 150 151
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
152
    }
153
#else
154 155 156 157 158
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
159
        int rc = close_wait_ms (r);
160 161
        errno_assert (rc == 0);
    }
162 163 164
#endif
}

Martin Hurton's avatar
Martin Hurton committed
165
zmq::fd_t zmq::signaler_t::get_fd () const
166 167 168 169 170 171
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
172 173
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
174 175 176 177
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
178 179 180 181 182
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
183
    unsigned char dummy = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
184
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
185 186 187 188 189 190 191 192
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
193
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
194
        if (unlikely (pid != getpid ())) {
195 196 197 198 199
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
200
        zmq_assert (nbytes == sizeof dummy);
201 202 203 204 205 206 207
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
208
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
209
    if (unlikely (pid != getpid ())) {
210
        // we have forked and the file descriptor is closed. Emulate an interrupt
211 212 213 214 215 216 217
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

218
#ifdef ZMQ_POLL_BASED_ON_POLL
219 220 221 222 223
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
224
        errno_assert (errno == EINTR);
225 226
        return -1;
    }
227 228
    else
    if (unlikely (rc == 0)) {
229 230 231
        errno = EAGAIN;
        return -1;
    }
232
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
233 234
    else
    if (unlikely (pid != getpid ())) {
235
        // we have forked and the file descriptor is closed. Emulate an interrupt
236 237 238 239 240 241
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
242 243 244 245
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

246
#elif defined ZMQ_POLL_BASED_ON_SELECT
247 248 249 250 251

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
252 253 254 255
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
256
#ifdef ZMQ_HAVE_WINDOWS
257 258
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
259 260
    wsa_assert (rc != SOCKET_ERROR);
#else
261 262
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
263
    if (unlikely (rc < 0)) {
264
        errno_assert (errno == EINTR);
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
301
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
302 303 304 305 306 307 308 309 310 311 312
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
313 314
{
    //  Attempt to read a signal.
315 316 317
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
318 319
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
320
        return -1;
321
    }
322 323 324 325 326 327 328 329 330
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
331
            return 0;
332
        }
333

334 335
        zmq_assert (dummy == 1);
    }
336
#else
337
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
338
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
339
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
340
    if (nbytes == SOCKET_ERROR) {
341 342 343
        const int last_error = WSAGetLastError();
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
344
            return -1;
345
        }
346 347
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
348 349
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
350 351 352 353 354 355 356
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
357 358 359
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
360
#endif
361
    return 0;
362 363
}

364
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
365
void zmq::signaler_t::forked ()
366
{
367 368 369
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
370
    make_fdpair (&r, &w);
371 372 373
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
374
//  Returns -1 if we could not make the socket pair successfully
375 376
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
377 378
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
379 380 381 382 383 384 385 386 387
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
388 389

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
390
#   if !defined _WIN32_WCE
391
    //  Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
392 393
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
394 395
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
396

Martin Hurton's avatar
Martin Hurton committed
397 398
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
399

Martin Hurton's avatar
Martin Hurton committed
400
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
401
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
402
#   endif
403

404 405 406 407
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
408 409 410
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
411 412 413
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
414 415 416 417 418
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
419
#       if !defined _WIN32_WCE
420
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
421
#       else
422
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
423 424
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
425
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
426
                              FALSE, L"Global\\zmq-signaler-port-sync");
427 428 429

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
430 431 432
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
433 434 435
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
436
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
437
#       endif
438 439

#       if !defined _WIN32_WCE
440
        sync = CreateMutexW (&sa, FALSE, mutex_name);
441
#       else
442
        sync = CreateMutexW (NULL, FALSE, mutex_name);
443 444
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
445
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
446 447 448

        win_assert (sync != NULL);
    }
449

450 451 452 453 454 455 456
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
457
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
458 459 460 461 462
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
463
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
464 465 466
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
467
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
468 469
    wsa_assert (rc != SOCKET_ERROR);

470
    //  Init sockaddr to signaler port.
471
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
472
    memset (&addr, 0, sizeof addr);
473 474
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
475
    addr.sin_port = htons (signaler_port);
476 477

    //  Create the writer socket.
478
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
479 480 481 482
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
483
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
484 485
    wsa_assert (rc != SOCKET_ERROR);

486 487 488
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
489
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
490
    }
491 492

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
493
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
494

495 496
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
497
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
498
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
499 500
    }

501 502 503 504
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

505
    //  Connect writer to the listener.
506
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
507
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
508

509 510
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
511
        *r_ = accept (listener, NULL, NULL);
512

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

534 535 536 537 538
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

539
    //  We don't need the listening socket anymore. Close it.
540
    rc = closesocket (listener);
541
    wsa_assert(rc != SOCKET_ERROR);
542

543 544
    if (sync != NULL) {
        //  Exit the critical section.
545 546 547 548 549
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
550
        win_assert (brc != 0);
551

552 553 554 555
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
556

557
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
558
#   if !defined _WIN32_WCE
559
        //  On Windows, preventing sockets to be inherited by child processes.
560
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
561
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
562
#   endif
563
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
564 565
    }
    else {
566
        //  Cleanup writer if connection failed
567 568 569 570 571
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
572
        //  Set errno from saved value
573
        errno = wsa_error_to_errno (saved_errno);
574 575
        return -1;
    }
576 577 578 579 580 581 582 583 584

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
585
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
586
    memset (&lcladdr, 0, sizeof lcladdr);
587 588 589 590
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

591
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
592 593 594
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
595
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
596 597
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
598
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
599 600
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
601
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
602 603
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
604
    socklen_t lcladdr_len = sizeof lcladdr;
605

Pieter Hintjens's avatar
Pieter Hintjens committed
606
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
607 608 609 610 611
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

612
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
613 614
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
615
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
616 617
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
618
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
619 620
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
621
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
622 623 624 625 626 627 628 629 630
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
631 632
#else
    // All other implementations support socketpair()
633 634
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
635 636
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
637
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
638 639 640 641 642 643 644
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
645 646
#endif
}