signaler.cpp 19.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "poller.hpp"
32 33 34 35 36

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
37
#if defined ZMQ_POLL_BASED_ON_POLL
38
#include <poll.h>
39
#elif defined ZMQ_POLL_BASED_ON_SELECT
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
55
#include "stdint.hpp"
56
#include "config.hpp"
57 58 59 60
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

61 62 63 64
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

65
#if !defined ZMQ_HAVE_WINDOWS
66 67 68 69 70 71
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
104 105
    do {
        if (rc == -1 && errno == EAGAIN) {
106 107 108 109 110 111 112 113 114 115
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

116 117 118
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
119 120 121 122
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
123
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
124
    pid = getpid ();
125
#endif
126 127
}

128 129
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
130 131
zmq::signaler_t::~signaler_t ()
{
132
#if defined ZMQ_HAVE_EVENTFD
133
    if (r == retired_fd) return;
134
    int rc = close_wait_ms (r);
135 136
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
137 138 139 140 141 142 143 144 145 146 147 148 149
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
150
    }
151
#else
152 153 154 155 156
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
157
        int rc = close_wait_ms (r);
158 159
        errno_assert (rc == 0);
    }
160 161 162
#endif
}

Martin Hurton's avatar
Martin Hurton committed
163
zmq::fd_t zmq::signaler_t::get_fd () const
164 165 166 167 168 169
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
170 171
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
172 173 174 175
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
176 177 178 179 180
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
181
    unsigned char dummy = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
182
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
183 184 185 186 187 188 189 190
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
191
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
192
        if (unlikely (pid != getpid ())) {
193 194 195 196 197
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
198
        zmq_assert (nbytes == sizeof dummy);
199 200 201 202 203 204 205
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
206
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
207
    if (unlikely (pid != getpid ())) {
208
        // we have forked and the file descriptor is closed. Emulate an interrupt
209 210 211 212 213 214 215
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

216
#ifdef ZMQ_POLL_BASED_ON_POLL
217 218 219 220 221
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
222
        errno_assert (errno == EINTR);
223 224
        return -1;
    }
225 226
    else
    if (unlikely (rc == 0)) {
227 228 229
        errno = EAGAIN;
        return -1;
    }
230
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
231 232
    else
    if (unlikely (pid != getpid ())) {
233
        // we have forked and the file descriptor is closed. Emulate an interrupt
234 235 236 237 238 239
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
240 241 242 243
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

244
#elif defined ZMQ_POLL_BASED_ON_SELECT
245 246 247 248 249

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
250 251 252 253
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
254
#ifdef ZMQ_HAVE_WINDOWS
255 256
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
257 258
    wsa_assert (rc != SOCKET_ERROR);
#else
259 260
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
261
    if (unlikely (rc < 0)) {
262
        errno_assert (errno == EINTR);
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
299
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
300 301 302 303 304 305 306 307 308 309 310
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
311 312
{
    //  Attempt to read a signal.
313 314 315
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
316 317
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
318
        return -1;
319
    }
320 321 322 323 324 325 326 327 328
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
329
            return 0;
330
        }
331

332 333
        zmq_assert (dummy == 1);
    }
334
#else
335
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
336
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
337
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
338 339 340 341 342 343 344 345
    if (nbytes == SOCKET_ERROR) {
		const int last_error = WSAGetLastError();
		if (last_error == WSAEWOULDBLOCK) {
			errno = EAGAIN;
            return -1;
		}
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
346 347
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
348 349 350 351 352 353 354
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
355 356 357
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
358
#endif
359
    return 0;
360 361
}

362
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
363
void zmq::signaler_t::forked ()
364
{
365 366 367
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
368
    make_fdpair (&r, &w);
369 370 371
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
372
//  Returns -1 if we could not make the socket pair successfully
373 374
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
375 376
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
377 378 379 380 381 382 383 384 385
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
386 387

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
388
#   if !defined _WIN32_WCE
389
    // Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
390 391
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
392 393
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
394

Martin Hurton's avatar
Martin Hurton committed
395 396
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
397

Martin Hurton's avatar
Martin Hurton committed
398
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
399
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
400
#   endif
401

402 403 404 405
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
406 407 408
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
409 410 411
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
412 413 414 415 416
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
417
#       if !defined _WIN32_WCE
418
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
419
#       else
420
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
421 422
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
423
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
424
                              FALSE, L"Global\\zmq-signaler-port-sync");
425 426 427

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
428 429 430
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
431 432 433
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
434
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
435
#       endif
436 437

#       if !defined _WIN32_WCE
438
        sync = CreateMutexW (&sa, FALSE, mutex_name);
439
#       else
440
        sync = CreateMutexW (NULL, FALSE, mutex_name);
441 442
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
443
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
444 445 446

        win_assert (sync != NULL);
    }
447

448 449 450 451 452 453 454
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
455
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
456 457 458 459 460
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
461
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
462 463 464
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
465
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
466 467
    wsa_assert (rc != SOCKET_ERROR);

468
    //  Init sockaddr to signaler port.
469
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
470
    memset (&addr, 0, sizeof addr);
471 472
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
473
    addr.sin_port = htons (signaler_port);
474 475

    //  Create the writer socket.
476
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
477 478 479 480
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
481
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
482 483
    wsa_assert (rc != SOCKET_ERROR);

484 485 486
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
487
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
488
    }
489 490

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
491
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
492

493 494
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
495
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
496
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
497 498
    }

499 500 501 502
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

503
    //  Connect writer to the listener.
504
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
505
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
506

507 508
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
509
        *r_ = accept (listener, NULL, NULL);
510

511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

532 533 534 535 536
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

537
    //  We don't need the listening socket anymore. Close it.
538
    rc = closesocket (listener);
539
    wsa_assert(rc != SOCKET_ERROR);
540

541 542
    if (sync != NULL) {
        //  Exit the critical section.
543 544 545 546 547
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
548
        win_assert (brc != 0);
549

550 551 552 553
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
554

555
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
556
#   if !defined _WIN32_WCE
557
        //  On Windows, preventing sockets to be inherited by child processes.
558
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
559
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
560
#   endif
561
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
562 563
    }
    else {
564
        //  Cleanup writer if connection failed
565 566 567 568 569
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
570
        //  Set errno from saved value
571
        errno = wsa_error_to_errno (saved_errno);
572 573
        return -1;
    }
574 575 576 577 578 579 580 581 582

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
583
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
584
    memset (&lcladdr, 0, sizeof lcladdr);
585 586 587 588
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

589
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
590 591 592
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
593
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
594 595
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
596
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
597 598
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
599
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
600 601
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
602
    socklen_t lcladdr_len = sizeof lcladdr;
603

Pieter Hintjens's avatar
Pieter Hintjens committed
604
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
605 606 607 608 609
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

610
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
611 612
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
613
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
614 615
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
616
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
617 618
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
619
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
620 621 622 623 624 625 626 627 628
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
629 630
#else
    // All other implementations support socketpair()
631 632
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
633 634
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
635
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
636 637 638 639 640 641 642
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
643 644
#endif
}