signaler.cpp 19.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "poller.hpp"
32 33 34 35 36

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
37
#if defined ZMQ_POLL_BASED_ON_POLL
38
#include <poll.h>
39
#elif defined ZMQ_POLL_BASED_ON_SELECT
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
56
#include "stdint.hpp"
57
#include "config.hpp"
58 59 60 61
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

62 63 64 65
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

66 67 68 69 70 71 72 73 74 75
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
108 109
    do {
        if (rc == -1 && errno == EAGAIN) {
110 111 112 113 114 115 116 117 118 119
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

120 121 122
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
123 124 125 126
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
127
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
128
    pid = getpid ();
129
#endif
130 131
}

132 133
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
134 135
zmq::signaler_t::~signaler_t ()
{
136
#if defined ZMQ_HAVE_EVENTFD
137
    if (r == retired_fd) return;
138
    int rc = close_wait_ms (r);
139 140
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
141 142 143 144 145 146 147 148 149 150 151 152 153
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
154
    }
155
#else
156 157 158 159 160
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
161
        int rc = close_wait_ms (r);
162 163
        errno_assert (rc == 0);
    }
164 165 166
#endif
}

Martin Hurton's avatar
Martin Hurton committed
167
zmq::fd_t zmq::signaler_t::get_fd () const
168 169 170 171 172 173
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
174 175
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
176 177 178 179
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
180 181 182 183 184
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
185
    unsigned char dummy = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
186
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
187 188 189 190 191 192 193 194
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
195
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
196
        if (unlikely (pid != getpid ())) {
197 198 199 200 201
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
202
        zmq_assert (nbytes == sizeof dummy);
203 204 205 206 207 208 209
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
210
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
211
    if (unlikely (pid != getpid ())) {
212
        // we have forked and the file descriptor is closed. Emulate an interrupt
213 214 215 216 217 218 219
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

220
#ifdef ZMQ_POLL_BASED_ON_POLL
221 222 223 224 225
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
226
        errno_assert (errno == EINTR);
227 228
        return -1;
    }
229 230
    else
    if (unlikely (rc == 0)) {
231 232 233
        errno = EAGAIN;
        return -1;
    }
234
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
235 236
    else
    if (unlikely (pid != getpid ())) {
237
        // we have forked and the file descriptor is closed. Emulate an interrupt
238 239 240 241 242 243
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
244 245 246 247
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

248
#elif defined ZMQ_POLL_BASED_ON_SELECT
249 250 251 252 253

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
254 255 256 257
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
258
#ifdef ZMQ_HAVE_WINDOWS
259 260
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
261 262
    wsa_assert (rc != SOCKET_ERROR);
#else
263 264
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
265
    if (unlikely (rc < 0)) {
266
        errno_assert (errno == EINTR);
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
303
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
304 305 306 307 308 309 310 311 312 313 314
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
315 316
{
    //  Attempt to read a signal.
317 318 319
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
320 321
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
322
        return -1;
323
    }
324 325 326 327 328 329 330 331 332
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
333
            return 0;
334
        }
335

336 337
        zmq_assert (dummy == 1);
    }
338
#else
339
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
340
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
341
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
342 343 344 345 346 347 348 349
    if (nbytes == SOCKET_ERROR) {
		const int last_error = WSAGetLastError();
		if (last_error == WSAEWOULDBLOCK) {
			errno = EAGAIN;
            return -1;
		}
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
350 351
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
352 353 354 355 356 357 358
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
359 360 361
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
362
#endif
363
    return 0;
364 365
}

366
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
367
void zmq::signaler_t::forked ()
368
{
369 370 371
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
372
    make_fdpair (&r, &w);
373 374 375
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
376
//  Returns -1 if we could not make the socket pair successfully
377 378
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
379 380
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
381 382 383 384 385 386 387 388 389
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
390 391

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
392
#   if !defined _WIN32_WCE
393
    // Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
394 395
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
396 397
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
398

Martin Hurton's avatar
Martin Hurton committed
399 400
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
401

Martin Hurton's avatar
Martin Hurton committed
402
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
403
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
404
#   endif
405

406 407 408 409
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
410 411 412
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
413 414 415
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
416 417 418 419 420
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
421
#       if !defined _WIN32_WCE
422
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
423
#       else
424
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
425 426
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
427
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
428
                              FALSE, L"Global\\zmq-signaler-port-sync");
429 430 431

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
432 433 434
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
435 436 437
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
438
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
439
#       endif
440 441

#       if !defined _WIN32_WCE
442
        sync = CreateMutexW (&sa, FALSE, mutex_name);
443
#       else
444
        sync = CreateMutexW (NULL, FALSE, mutex_name);
445 446
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
447
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
448 449 450

        win_assert (sync != NULL);
    }
451

452 453 454 455 456 457 458
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
459
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
460 461 462 463 464
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
465
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
466 467 468
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
469
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
470 471
    wsa_assert (rc != SOCKET_ERROR);

472
    //  Init sockaddr to signaler port.
473
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
474
    memset (&addr, 0, sizeof addr);
475 476
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
477
    addr.sin_port = htons (signaler_port);
478 479

    //  Create the writer socket.
480
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
481 482 483 484
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
485
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
486 487
    wsa_assert (rc != SOCKET_ERROR);

488 489 490
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
491
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
492
    }
493 494

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
495
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
496

497 498
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
499
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
500
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
501 502
    }

503 504 505 506
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

507
    //  Connect writer to the listener.
508
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
509
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
510

511 512
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
513
        *r_ = accept (listener, NULL, NULL);
514

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

536 537 538 539 540
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

541
    //  We don't need the listening socket anymore. Close it.
542
    rc = closesocket (listener);
543
    wsa_assert(rc != SOCKET_ERROR);
544

545 546
    if (sync != NULL) {
        //  Exit the critical section.
547 548 549 550 551
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
552
        win_assert (brc != 0);
553

554 555 556 557
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
558

559
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
560
#   if !defined _WIN32_WCE
561
        //  On Windows, preventing sockets to be inherited by child processes.
562
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
563
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
564
#   endif
565
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
566 567
    }
    else {
568
        //  Cleanup writer if connection failed
569 570 571 572 573
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
574
        //  Set errno from saved value
575
        errno = wsa_error_to_errno (saved_errno);
576 577
        return -1;
    }
578 579 580 581 582 583 584 585 586

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
587
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
588
    memset (&lcladdr, 0, sizeof lcladdr);
589 590 591 592
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

593
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
594 595 596
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
597
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
598 599
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
600
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
601 602
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
603
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
604 605
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
606
    socklen_t lcladdr_len = sizeof lcladdr;
607

Pieter Hintjens's avatar
Pieter Hintjens committed
608
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
609 610 611 612 613
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

614
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
615 616
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
617
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
618 619
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
620
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
621 622
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
623
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
624 625 626 627 628 629 630 631 632
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
633 634
#else
    // All other implementations support socketpair()
635 636
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
637 638
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
639
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
640 641 642 643 644 645 646
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
647 648
#endif
}